diff options
Diffstat (limited to 'vespa-http-client/src/main/java/com/yahoo')
4 files changed, 55 insertions, 30 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java index d3e4b4ed762..3a67c80224f 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java @@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.config.Endpoint; * @author bjorncs */ public abstract class FeedEndpointException extends RuntimeException { + private final Endpoint endpoint; protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 129fc000271..623ea543ffb 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -5,8 +5,10 @@ import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.ErrorCode; import com.yahoo.vespa.http.client.core.OperationStatus; +import com.yahoo.vespa.http.client.core.ServerResponseException; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -29,7 +31,13 @@ public class DryRunGatewayConnection implements GatewayConnection { /** Set to true to hold off responding with a result to any incoming operations until this is set false */ private boolean hold = false; - private List<Document> held = new ArrayList<>(); + private final List<Document> held = new ArrayList<>(); + + /** If this is set, handshake operations will throw this exception */ + private ServerResponseException throwThisOnHandshake = null; + + /** If this is set, all write operations will throw this exception */ + private IOException throwThisOnWrite = null; public DryRunGatewayConnection(Endpoint endpoint, Clock clock) { this.endpoint = endpoint; @@ -37,27 +45,27 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream write(List<Document> docs) { - StringBuilder result = new StringBuilder(); + public InputStream write(List<Document> docs) throws IOException { + if (throwThisOnWrite != null) + throw throwThisOnWrite; + if (hold) { held.addAll(docs); + return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)); } else { + StringBuilder result = new StringBuilder(); for (Document doc : held) result.append(okResponse(doc).render()); held.clear(); for (Document doc : docs) result.append(okResponse(doc).render()); + return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); } - return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); - } - - public void hold(boolean hold) { - this.hold = hold; } @Override - public InputStream poll() { + public InputStream poll() throws IOException { lastPollTime = clock.instant(); return write(new ArrayList<>()); } @@ -66,7 +74,7 @@ public class DryRunGatewayConnection implements GatewayConnection { public Instant lastPollTime() { return lastPollTime; } @Override - public InputStream drain() { + public InputStream drain() throws IOException { return write(new ArrayList<>()); } @@ -85,14 +93,29 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public void handshake() { } + public void handshake() throws ServerResponseException { + if (throwThisOnHandshake != null) + throw throwThisOnHandshake; + } @Override public void close() { } + public void hold(boolean hold) { + this.hold = hold; + } + /** Returns the document currently held in this */ public List<Document> held() { return Collections.unmodifiableList(held); } + public void throwOnWrite(IOException throwThisOnWrite) { + this.throwThisOnWrite = throwThisOnWrite; + } + + public void throwOnHandshake(ServerResponseException throwThisOnHandshake) { + this.throwThisOnHandshake = throwThisOnHandshake; + } + private OperationStatus okResponse(Document document) { return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, ""); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index 1dd8b3bf3ec..a5a37a31665 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -65,7 +65,6 @@ class EndpointResultQueue { private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { operationProcessor.resultReceived(result, clusterId); - TimerFuture timerFuture = futureByOperation.remove(result.getOperationId()); if (timerFuture == null) { if (duplicateGivesWarning) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 8f052a1b3f6..4796f4bb4a8 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -51,7 +51,6 @@ class IOThread implements Runnable, AutoCloseable { private final int maxChunkSizeBytes; private final int maxInFlightRequests; private final Duration localQueueTimeOut; - private final Duration maxOldConnectionPollInterval; private final GatewayThrottler gatewayThrottler; private final Duration connectionTimeToLive; private final long pollIntervalUS; @@ -107,9 +106,6 @@ class IOThread implements Runnable, AutoCloseable { this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.clock = clock; this.localQueueTimeOut = localQueueTimeOut; - this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000 - ? localQueueTimeOut.dividedBy(10) - : Duration.ofMillis(pollIntervalUS / 1000); if (runThreads) { this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); @@ -231,9 +227,10 @@ class IOThread implements Runnable, AutoCloseable { private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) { for (Document doc : docs) { - resultQueue.failOperation( - EndPointResultFactory.createTransientError( - endpoint, doc.getOperationId(), servletException), clusterId); + resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint, + doc.getOperationId(), + servletException), + clusterId); } } @@ -327,8 +324,8 @@ class IOThread implements Runnable, AutoCloseable { } catch (Throwable throwable1) { drainFirstDocumentsInQueueIfOld(); - log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint - + "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1); + log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.", + throwable1); executeProblemsCounter.incrementAndGet(); return ConnectionState.DISCONNECTED; } @@ -341,8 +338,9 @@ class IOThread implements Runnable, AutoCloseable { } catch (ServerResponseException ser) { executeProblemsCounter.incrementAndGet(); - log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint - + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(ser) + "'",ser); + log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint + + "' failed. Will re-try handshake.", + ser); drainFirstDocumentsInQueueIfOld(); resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint)); @@ -350,8 +348,9 @@ class IOThread implements Runnable, AutoCloseable { } catch (Throwable throwable) { // This cover IOException as well executeProblemsCounter.incrementAndGet(); resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint)); - log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint - + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable); + log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + + endpoint + "' failed. Will re-try handshake.", + throwable); drainFirstDocumentsInQueueIfOld(); currentConnection.close(); return ConnectionState.DISCONNECTED; @@ -366,14 +365,14 @@ class IOThread implements Runnable, AutoCloseable { } catch (ServerResponseException ser) { log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + - "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '" - + Exceptions.toMessageString(ser) + "'",ser); + "'. Will re-try. Endpoint responded with an unexpected HTTP response code.", + ser); return ConnectionState.CONNECTED; } catch (Throwable e) { - log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + - "'. Will re-try. Connection level error. Failed with '" + - Exceptions.toMessageString(e) + "'", e); + log.log(Level.INFO, + "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.", + e); currentConnection.close(); return ConnectionState.DISCONNECTED; } @@ -532,4 +531,7 @@ class IOThread implements Runnable, AutoCloseable { /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */ public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); } + /** For testing */ + public EndpointResultQueue resultQueue() { return resultQueue; } + } |