aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com/yahoo
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/main/java/com/yahoo')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java45
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java38
4 files changed, 55 insertions, 30 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
index d3e4b4ed762..3a67c80224f 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
@@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.config.Endpoint;
* @author bjorncs
*/
public abstract class FeedEndpointException extends RuntimeException {
+
private final Endpoint endpoint;
protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 129fc000271..623ea543ffb 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -5,8 +5,10 @@ import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.OperationStatus;
+import com.yahoo.vespa.http.client.core.ServerResponseException;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
@@ -29,7 +31,13 @@ public class DryRunGatewayConnection implements GatewayConnection {
/** Set to true to hold off responding with a result to any incoming operations until this is set false */
private boolean hold = false;
- private List<Document> held = new ArrayList<>();
+ private final List<Document> held = new ArrayList<>();
+
+ /** If this is set, handshake operations will throw this exception */
+ private ServerResponseException throwThisOnHandshake = null;
+
+ /** If this is set, all write operations will throw this exception */
+ private IOException throwThisOnWrite = null;
public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
@@ -37,27 +45,27 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream write(List<Document> docs) {
- StringBuilder result = new StringBuilder();
+ public InputStream write(List<Document> docs) throws IOException {
+ if (throwThisOnWrite != null)
+ throw throwThisOnWrite;
+
if (hold) {
held.addAll(docs);
+ return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
}
else {
+ StringBuilder result = new StringBuilder();
for (Document doc : held)
result.append(okResponse(doc).render());
held.clear();
for (Document doc : docs)
result.append(okResponse(doc).render());
+ return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
}
- return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
- }
-
- public void hold(boolean hold) {
- this.hold = hold;
}
@Override
- public InputStream poll() {
+ public InputStream poll() throws IOException {
lastPollTime = clock.instant();
return write(new ArrayList<>());
}
@@ -66,7 +74,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
public Instant lastPollTime() { return lastPollTime; }
@Override
- public InputStream drain() {
+ public InputStream drain() throws IOException {
return write(new ArrayList<>());
}
@@ -85,14 +93,29 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public void handshake() { }
+ public void handshake() throws ServerResponseException {
+ if (throwThisOnHandshake != null)
+ throw throwThisOnHandshake;
+ }
@Override
public void close() { }
+ public void hold(boolean hold) {
+ this.hold = hold;
+ }
+
/** Returns the document currently held in this */
public List<Document> held() { return Collections.unmodifiableList(held); }
+ public void throwOnWrite(IOException throwThisOnWrite) {
+ this.throwThisOnWrite = throwThisOnWrite;
+ }
+
+ public void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
+ this.throwThisOnHandshake = throwThisOnHandshake;
+ }
+
private OperationStatus okResponse(Document document) {
return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, "");
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index 1dd8b3bf3ec..a5a37a31665 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -65,7 +65,6 @@ class EndpointResultQueue {
private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
operationProcessor.resultReceived(result, clusterId);
-
TimerFuture timerFuture = futureByOperation.remove(result.getOperationId());
if (timerFuture == null) {
if (duplicateGivesWarning) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 8f052a1b3f6..4796f4bb4a8 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -51,7 +51,6 @@ class IOThread implements Runnable, AutoCloseable {
private final int maxChunkSizeBytes;
private final int maxInFlightRequests;
private final Duration localQueueTimeOut;
- private final Duration maxOldConnectionPollInterval;
private final GatewayThrottler gatewayThrottler;
private final Duration connectionTimeToLive;
private final long pollIntervalUS;
@@ -107,9 +106,6 @@ class IOThread implements Runnable, AutoCloseable {
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
this.localQueueTimeOut = localQueueTimeOut;
- this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000
- ? localQueueTimeOut.dividedBy(10)
- : Duration.ofMillis(pollIntervalUS / 1000);
if (runThreads) {
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
@@ -231,9 +227,10 @@ class IOThread implements Runnable, AutoCloseable {
private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
for (Document doc : docs) {
- resultQueue.failOperation(
- EndPointResultFactory.createTransientError(
- endpoint, doc.getOperationId(), servletException), clusterId);
+ resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint,
+ doc.getOperationId(),
+ servletException),
+ clusterId);
}
}
@@ -327,8 +324,8 @@ class IOThread implements Runnable, AutoCloseable {
} catch (Throwable throwable1) {
drainFirstDocumentsInQueueIfOld();
- log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint
- + "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1);
+ log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.",
+ throwable1);
executeProblemsCounter.incrementAndGet();
return ConnectionState.DISCONNECTED;
}
@@ -341,8 +338,9 @@ class IOThread implements Runnable, AutoCloseable {
} catch (ServerResponseException ser) {
executeProblemsCounter.incrementAndGet();
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
- + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(ser) + "'",ser);
+ log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint +
+ "' failed. Will re-try handshake.",
+ ser);
drainFirstDocumentsInQueueIfOld();
resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint));
@@ -350,8 +348,9 @@ class IOThread implements Runnable, AutoCloseable {
} catch (Throwable throwable) { // This cover IOException as well
executeProblemsCounter.incrementAndGet();
resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint));
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
- + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable);
+ log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" +
+ endpoint + "' failed. Will re-try handshake.",
+ throwable);
drainFirstDocumentsInQueueIfOld();
currentConnection.close();
return ConnectionState.DISCONNECTED;
@@ -366,14 +365,14 @@ class IOThread implements Runnable, AutoCloseable {
}
catch (ServerResponseException ser) {
log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '"
- + Exceptions.toMessageString(ser) + "'",ser);
+ "'. Will re-try. Endpoint responded with an unexpected HTTP response code.",
+ ser);
return ConnectionState.CONNECTED;
}
catch (Throwable e) {
- log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Connection level error. Failed with '" +
- Exceptions.toMessageString(e) + "'", e);
+ log.log(Level.INFO,
+ "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.",
+ e);
currentConnection.close();
return ConnectionState.DISCONNECTED;
}
@@ -532,4 +531,7 @@ class IOThread implements Runnable, AutoCloseable {
/** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
+ /** For testing */
+ public EndpointResultQueue resultQueue() { return resultQueue; }
+
}