aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-28 09:41:42 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-28 09:41:42 +0200
commitfeafa659ac4ac76cd9ac3067ca394a6470b1e59e (patch)
treeff0cb300de7cccb6ca026708e81e35c285eaa300 /vespa-http-client
parentc5adf87ecf4d6de277ad233137beeec318c869c3 (diff)
Time out connections on the IOThread level
Time out connections on the IOThread level instead of leaving this to Apache. Keep old connections alive for a while after timeout and keep polling them such that, if the old connection hits a different real behind a VIP than the new connection we'll still get the replies.
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java4
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java47
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java4
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java20
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java9
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java218
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java5
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java31
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java14
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java12
12 files changed, 240 insertions, 131 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
index d623db3834c..200bedb90da 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
@@ -172,6 +172,10 @@ public final class FeedParams {
return this;
}
+ /**
+ * Sets the number of milliseconds until we respond with a timeout for a document operation
+ * if we still have not received a response.
+ */
public Builder setLocalQueueTimeOut(long timeOutMs) {
this.localQueueTimeOut = timeOutMs;
return this;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
index bc38155d07a..a08064c8ce5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
@@ -10,6 +10,8 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
/**
+ * A document operation
+ *
* @author Einar M R Rosenvinge
*/
final public class Document {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
index c5864e48681..ee801270c89 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
@@ -65,6 +65,7 @@ class ApacheGatewayConnection implements GatewayConnection {
private final ConnectionParams connectionParams;
private HttpClient httpClient;
private Instant connectionTime = null;
+ private Instant lastPollTime = null;
private String sessionId;
private final String clientId;
private int negotiatedVersion = -1;
@@ -96,11 +97,20 @@ class ApacheGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException {
+ public InputStream write(List<Document> docs) throws ServerResponseException, IOException {
return write(docs, false, connectionParams.getUseCompression());
}
@Override
+ public InputStream poll() throws ServerResponseException, IOException {
+ lastPollTime = Clock.systemUTC().instant();
+ return write(Collections.<Document>emptyList(), false, false);
+ }
+
+ @Override
+ public Instant lastPollTime() { return lastPollTime; }
+
+ @Override
public InputStream drain() throws ServerResponseException, IOException {
return write(Collections.<Document>emptyList(), true, false);
}
@@ -185,11 +195,7 @@ class ApacheGatewayConnection implements GatewayConnection {
httpPost.setHeader(Headers.CLIENT_ID, clientId);
}
httpPost.setHeader(Headers.SHARDING_KEY, shardingKey);
- if (drain) {
- httpPost.setHeader(Headers.DRAIN, "true");
- } else {
- httpPost.setHeader(Headers.DRAIN, "false");
- }
+ httpPost.setHeader(Headers.DRAIN, drain ? "true" : "false");
if (clusterSpecificRoute != null) {
httpPost.setHeader(Headers.ROUTE, feedParams.getRoute());
} else {
@@ -257,18 +263,14 @@ class ApacheGatewayConnection implements GatewayConnection {
private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException {
StatusLine statusLine = response.getStatusLine();
+ int statusCode = statusLine.getStatusCode();
+
// We use code 261-299 to report errors related to internal transitive errors that the tenants should not care
// about to avoid masking more serious errors.
- int statusCode = statusLine.getStatusCode();
- if (statusCode > 199 && statusCode < 260) {
- return;
- }
- if (statusCode == 299) {
- throw new ServerResponseException(429, "Too many requests.");
- }
- String message = tryGetDetailedErrorMessage(response)
- .orElseGet(statusLine::getReasonPhrase);
- throw new ServerResponseException(statusLine.getStatusCode(), message);
+ if (statusCode > 199 && statusCode < 260) return;
+ if (statusCode == 299) throw new ServerResponseException(429, "Too many requests.");
+ throw new ServerResponseException(statusCode,
+ tryGetDetailedErrorMessage(response).orElseGet(statusLine::getReasonPhrase));
}
private static Optional<String> tryGetDetailedErrorMessage(HttpResponse response) {
@@ -292,7 +294,7 @@ class ApacheGatewayConnection implements GatewayConnection {
if (negotiatedVersion == 3) {
if (clientId == null || !clientId.equals(serverHeaderVal)) {
String message = "Running using v3. However, server responds with different session " +
- "than client has set; " + serverHeaderVal + " vs client code " + clientId;
+ "than client has set; " + serverHeaderVal + " vs client code " + clientId;
log.severe(message);
throw new ServerResponseException(message);
}
@@ -301,14 +303,12 @@ class ApacheGatewayConnection implements GatewayConnection {
if (sessionId == null) { //this must be the first request
log.finer("Got session ID from server: " + serverHeaderVal);
this.sessionId = serverHeaderVal;
- return;
} else {
if (!sessionId.equals(serverHeaderVal)) {
- log.info("Request has been routed to a server which does not recognize the client session."
- + " Most likely cause is upgrading of cluster, transitive error.");
- throw new ServerResponseException(
- "Session ID received from server ('" + serverHeaderVal
- + "') does not match cached session ID ('" + sessionId + "')");
+ log.info("Request has been routed to a server which does not recognize the client session." +
+ " Most likely cause is upgrading of cluster, transitive error.");
+ throw new ServerResponseException("Session ID received from server ('" + serverHeaderVal +
+ "') does not match cached session ID ('" + sessionId + "')");
}
}
}
@@ -415,7 +415,6 @@ class ApacheGatewayConnection implements GatewayConnection {
}
clientBuilder.setMaxConnPerRoute(1);
clientBuilder.setMaxConnTotal(1);
- clientBuilder.setConnectionTimeToLive(connectionParams.getConnectionTimeToLive().getSeconds(), TimeUnit.SECONDS);
clientBuilder.setUserAgent(String.format("vespa-http-client (%s)", Vtag.currentVersion));
clientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader(Headers.CLIENT_VERSION, Vtag.currentVersion)));
clientBuilder.disableContentCompression();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index c50f9420973..319a925611a 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -14,6 +14,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.io.IOException;
import java.io.StringWriter;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -85,9 +86,10 @@ public class ClusterConnection implements AutoCloseable {
clusterId,
feedParams.getMaxChunkSizeBytes(),
maxInFlightPerSession,
- feedParams.getLocalQueueTimeOut(),
+ Duration.ofMillis(feedParams.getLocalQueueTimeOut()),
documentQueue,
feedParams.getMaxSleepTimeMs(),
+ connectionParams.getConnectionTimeToLive(),
idlePollFrequency);
ioThreads.add(ioThread);
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
index f6b3d1fb62a..ce867d001aa 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.core.Document;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@@ -106,13 +107,13 @@ class DocumentQueue {
return previousState;
}
- Optional<Document> pollDocumentIfTimedoutInQueue(long localQueueTimeOut) {
+ Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) {
synchronized (queue) {
if (queue.isEmpty()) {
return Optional.empty();
}
Document document = queue.peek();
- if (document.timeInQueueMillis() > localQueueTimeOut) {
+ if (document.timeInQueueMillis() > localQueueTimeOut.toMillis()) {
return Optional.of(queue.poll());
}
return Optional.empty();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index f91a853c52c..02df8c52878 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -5,10 +5,8 @@ import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespa.http.client.core.ServerResponseException;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
@@ -25,13 +23,14 @@ public class DryRunGatewayConnection implements GatewayConnection {
private final Endpoint endpoint;
private Instant connectionTime = null;
+ private Instant lastPollTime = null;
public DryRunGatewayConnection(Endpoint endpoint) {
this.endpoint = endpoint;
}
@Override
- public InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException {
+ public InputStream write(List<Document> docs) {
StringBuilder result = new StringBuilder();
for (Document doc : docs) {
OperationStatus operationStatus = new OperationStatus("ok", doc.getOperationId(), ErrorCode.OK, false, "");
@@ -41,8 +40,17 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream drain() throws ServerResponseException, IOException {
- return writeOperations(new ArrayList<Document>());
+ public InputStream poll() {
+ lastPollTime = Clock.systemUTC().instant();
+ return write(new ArrayList<>());
+ }
+
+ @Override
+ public Instant lastPollTime() { return lastPollTime; }
+
+ @Override
+ public InputStream drain() {
+ return write(new ArrayList<>());
}
@Override
@@ -60,7 +68,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public void handshake() throws ServerResponseException, IOException { }
+ public void handshake() { }
@Override
public void close() { }
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
index 1b205d8ee41..ce1edb83fa2 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
@@ -14,8 +14,15 @@ public interface GatewayConnection {
/** Returns the time this connected over the network, or null if not connected yet */
Instant connectionTime();
- InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException;
+ /** Returns the last time poll was called on this, or null if never */
+ Instant lastPollTime();
+ InputStream write(List<Document> docs) throws ServerResponseException, IOException;
+
+ /** Returns any operation results that are ready now */
+ InputStream poll() throws ServerResponseException, IOException;
+
+ /** Attempt to drain all outstanding operations, even if this leads to blocking */
InputStream drain() throws ServerResponseException, IOException;
boolean connect();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 9aad633bd7b..d1bb0ad9a4f 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -13,8 +13,12 @@ import com.yahoo.vespa.http.client.core.ServerResponseException;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
@@ -33,7 +37,7 @@ class IOThread implements Runnable, AutoCloseable {
private static final Logger log = Logger.getLogger(IOThread.class.getName());
private final Endpoint endpoint;
- private final GatewayConnection currentConnection;
+ private final GatewayConnectionFactory connectionFactory;
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
private final Thread thread;
@@ -42,12 +46,24 @@ class IOThread implements Runnable, AutoCloseable {
private final CountDownLatch stopSignal = new CountDownLatch(1);
private final int maxChunkSizeBytes;
private final int maxInFlightRequests;
- private final long localQueueTimeOut;
+ private final Duration localQueueTimeOut;
+ private final Duration maxOldConnectionPollInterval;
private final GatewayThrottler gatewayThrottler;
+ private final Duration connectionTimeToLive;
private final long pollIntervalUS;
private final Random random = new Random();
- private enum ThreadState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
+ private GatewayConnection currentConnection;
+
+ /**
+ * Previous connections on which we have sent operations and are still waiting for the result
+ * (so all connections in this are in state SESSION_SYNCED).
+ * We need to drain results on the connection where they were sent to make sure we request results on
+ * the node which received the operation also when going through a VIP.
+ */
+ private final List<GatewayConnection> oldConnections = new ArrayList<>();
+
+ private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0);
@@ -65,22 +81,26 @@ class IOThread implements Runnable, AutoCloseable {
int clusterId,
int maxChunkSizeBytes,
int maxInFlightRequests,
- long localQueueTimeOut,
+ Duration localQueueTimeOut,
DocumentQueue documentQueue,
long maxSleepTimeMs,
+ Duration connectionTimeToLive,
double idlePollFrequency) {
this.endpoint = endpoint;
this.documentQueue = documentQueue;
+ this.connectionFactory = connectionFactory;
this.currentConnection = connectionFactory.newConnection();
this.resultQueue = endpointResultQueue;
this.clusterId = clusterId;
this.maxChunkSizeBytes = maxChunkSizeBytes;
this.maxInFlightRequests = maxInFlightRequests;
+ this.connectionTimeToLive = connectionTimeToLive;
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
+ this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10);
thread.start();
}
@@ -88,41 +108,6 @@ class IOThread implements Runnable, AutoCloseable {
return endpoint;
}
- public static class ConnectionStats {
-
- // NOTE: These fields are accessed by reflection in JSON serialization
-
- public final int wrongSessionDetectedCounter;
- public final int wrongVersionDetectedCounter;
- public final int problemStatusCodeFromServerCounter;
- public final int executeProblemsCounter;
- public final int docsReceivedCounter;
- public final int statusReceivedCounter;
- public final int pendingDocumentStatusCount;
- public final int successfullHandshakes;
- public final int lastGatewayProcessTimeMillis;
-
- ConnectionStats(int wrongSessionDetectedCounter,
- int wrongVersionDetectedCounter,
- int problemStatusCodeFromServerCounter,
- int executeProblemsCounter,
- int docsReceivedCounter,
- int statusReceivedCounter,
- int pendingDocumentStatusCount,
- int successfullHandshakes,
- int lastGatewayProcessTimeMillis) {
- this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
- this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
- this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
- this.executeProblemsCounter = executeProblemsCounter;
- this.docsReceivedCounter = docsReceivedCounter;
- this.statusReceivedCounter = statusReceivedCounter;
- this.pendingDocumentStatusCount = pendingDocumentStatusCount;
- this.successfullHandshakes = successfullHandshakes;
- this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis;
- }
- }
-
/**
* Returns a snapshot of counters. Threadsafe.
*/
@@ -236,12 +221,12 @@ class IOThread implements Runnable, AutoCloseable {
private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
try {
// Post the new docs and get async responses for other posts.
- return currentConnection.writeOperations(docs);
+ return currentConnection.write(docs);
} catch (ServerResponseException ser) {
markDocumentAsFailed(docs, ser);
throw ser;
} catch (Exception e) {
- markDocumentAsFailed(docs, new ServerResponseException(e.getMessage()));
+ markDocumentAsFailed(docs, new ServerResponseException(Exceptions.toMessageString(e)));
throw e;
}
}
@@ -309,27 +294,29 @@ class IOThread implements Runnable, AutoCloseable {
return processResponse;
}
- /** Given a current thread state, take the appropriate action and return the resulting new thread state */
- private ThreadState cycle(ThreadState threadState) {
- switch(threadState) {
+ /** Given a current connection state, take the appropriate action and return the resulting new connection state */
+ private ConnectionState cycle(ConnectionState connectionState) {
+ switch(connectionState) {
case DISCONNECTED:
try {
if (! currentConnection.connect()) {
log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try.");
drainFirstDocumentsInQueueIfOld();
- return ThreadState.DISCONNECTED;
+ return ConnectionState.DISCONNECTED;
}
- return ThreadState.CONNECTED;
+ return ConnectionState.CONNECTED;
} catch (Throwable throwable1) {
drainFirstDocumentsInQueueIfOld();
log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint
+ "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1);
executeProblemsCounter.incrementAndGet();
- return ThreadState.DISCONNECTED;
+ return ConnectionState.DISCONNECTED;
}
case CONNECTED:
try {
+ if (isStale(currentConnection))
+ return refreshConnection(connectionState);
currentConnection.handshake();
successfulHandshakes.getAndIncrement();
} catch (ServerResponseException ser) {
@@ -340,7 +327,7 @@ class IOThread implements Runnable, AutoCloseable {
drainFirstDocumentsInQueueIfOld();
resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint));
- return ThreadState.CONNECTED;
+ return ConnectionState.CONNECTED;
} catch (Throwable throwable) { // This cover IOException as well
executeProblemsCounter.incrementAndGet();
resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint));
@@ -348,38 +335,53 @@ class IOThread implements Runnable, AutoCloseable {
+ "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable);
drainFirstDocumentsInQueueIfOld();
currentConnection.close();
- return ThreadState.DISCONNECTED;
+ return ConnectionState.DISCONNECTED;
}
- return ThreadState.SESSION_SYNCED;
+ return ConnectionState.SESSION_SYNCED;
case SESSION_SYNCED:
try {
+ if (isStale(currentConnection))
+ return refreshConnection(connectionState);
ProcessResponse processResponse = pullAndProcessData(pollIntervalUS);
gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
}
catch (ServerResponseException ser) {
- log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint
- + "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '"
- + Exceptions.toMessageString(ser) + "'",ser);
- return ThreadState.CONNECTED;
+ log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
+ "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '"
+ + Exceptions.toMessageString(ser) + "'",ser);
+ return ConnectionState.CONNECTED;
}
- catch (Throwable e) { // Covers IOException as well
- log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint
- + "'. Will re-try. Connection level error. Failed with '" + Exceptions.toMessageString(e) + "'", e);
+ catch (Throwable e) {
+ log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
+ "'. Will re-try. Connection level error. Failed with '" +
+ Exceptions.toMessageString(e) + "'", e);
currentConnection.close();
- return ThreadState.DISCONNECTED;
+ return ConnectionState.DISCONNECTED;
}
- return ThreadState.SESSION_SYNCED;
+ return ConnectionState.SESSION_SYNCED;
default: {
log.severe("Should never get here.");
currentConnection.close();
- return ThreadState.DISCONNECTED;
+ return ConnectionState.DISCONNECTED;
}
}
}
- private void sleepIfProblemsGettingSyncedConnection(ThreadState newState, ThreadState oldState) {
- if (newState == ThreadState.SESSION_SYNCED) return;
- if (newState == ThreadState.CONNECTED && oldState == ThreadState.DISCONNECTED) return;
+ private boolean isStale(GatewayConnection connection) {
+ return connection.connectionTime() != null
+ && connection.connectionTime().plus(connectionTimeToLive).isBefore(Clock.systemUTC().instant());
+ }
+
+ private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
+ if (currentConnectionState == ConnectionState.SESSION_SYNCED)
+ oldConnections.add(currentConnection);
+ currentConnection = connectionFactory.newConnection();
+ return ConnectionState.DISCONNECTED;
+ }
+
+ private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) {
+ if (newState == ConnectionState.SESSION_SYNCED) return;
+ if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return;
try {
// Take it easy we have problems getting a connection up.
if (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) {
@@ -391,11 +393,12 @@ class IOThread implements Runnable, AutoCloseable {
@Override
public void run() {
- ThreadState threadState = ThreadState.DISCONNECTED;
+ ConnectionState connectionState = ConnectionState.DISCONNECTED;
while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) {
- ThreadState oldState = threadState;
- threadState = cycle(threadState);
- sleepIfProblemsGettingSyncedConnection(threadState, oldState);
+ ConnectionState oldState = connectionState;
+ connectionState = cycle(connectionState);
+ checkOldConnections();
+ sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
}
log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size());
@@ -410,8 +413,8 @@ class IOThread implements Runnable, AutoCloseable {
EndpointResult endpointResult = EndPointResultFactory.createTransientError(
endpoint, document.get().getOperationId(),
- new Exception("Not sending document operation, timed out in queue after "
- + document.get().timeInQueueMillis() + " ms."));
+ new Exception("Not sending document operation, timed out in queue after " +
+ document.get().timeInQueueMillis() + " ms."));
resultQueue.failOperation(endpointResult, clusterId);
}
}
@@ -427,4 +430,81 @@ class IOThread implements Runnable, AutoCloseable {
}
}
+ private void checkOldConnections() {
+ if (resultQueue.getPendingSize() == 0) {
+ oldConnections.forEach(c -> c.close());
+ oldConnections.clear();
+ return;
+ }
+
+ for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) {
+ GatewayConnection connection = i.next();
+ if (closingTime(connection).isBefore(Clock.systemUTC().instant())) {
+ connection.close();
+ i.remove();;
+ }
+ else if (timeToPoll(connection)) {
+ try {
+ processResponse(connection.poll());
+ }
+ catch (Exception e) {
+ // Old connection; this may be ok
+ }
+ }
+
+ }
+ }
+
+ private Instant closingTime(GatewayConnection connection) {
+ return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut);
+ }
+
+ private boolean timeToPoll(GatewayConnection connection) {
+ if (connection.lastPollTime() == null) return true;
+
+ // Poll less the closer the connection comes to closing time
+ double newness = ( closingTime(connection).toEpochMilli() - Clock.systemUTC().instant().toEpochMilli() ) /
+ (double)localQueueTimeOut.toMillis();
+ if (newness < 0) return true; // connection retired prematurely
+ if (newness > 1) return false; // closing time reached
+ Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 +
+ (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000)));
+ return connection.lastPollTime().plus(pollInterval).isBefore(Clock.systemUTC().instant());
+ }
+
+ public static class ConnectionStats {
+
+ // NOTE: These fields are accessed by reflection in JSON serialization
+
+ public final int wrongSessionDetectedCounter;
+ public final int wrongVersionDetectedCounter;
+ public final int problemStatusCodeFromServerCounter;
+ public final int executeProblemsCounter;
+ public final int docsReceivedCounter;
+ public final int statusReceivedCounter;
+ public final int pendingDocumentStatusCount;
+ public final int successfullHandshakes;
+ public final int lastGatewayProcessTimeMillis;
+
+ ConnectionStats(int wrongSessionDetectedCounter,
+ int wrongVersionDetectedCounter,
+ int problemStatusCodeFromServerCounter,
+ int executeProblemsCounter,
+ int docsReceivedCounter,
+ int statusReceivedCounter,
+ int pendingDocumentStatusCount,
+ int successfullHandshakes,
+ int lastGatewayProcessTimeMillis) {
+ this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
+ this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
+ this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
+ this.executeProblemsCounter = executeProblemsCounter;
+ this.docsReceivedCounter = docsReceivedCounter;
+ this.statusReceivedCounter = statusReceivedCounter;
+ this.pendingDocumentStatusCount = pendingDocumentStatusCount;
+ this.successfullHandshakes = successfullHandshakes;
+ this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis;
+ }
+ }
+
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
index 1f875e0dd72..8ff2566ead1 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
@@ -24,6 +24,7 @@ import static com.yahoo.vespa.http.client.TestUtils.writeDocument;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -249,12 +250,12 @@ public class QueueBoundsTest {
}
int errors = 0;
for (Result result : session.results()) {
- assertThat(result.getDetails().size(), is(1));
+ assertEquals(1, result.getDetails().size());
if (! result.isSuccess()) {
errors++;
}
}
- assertThat(errors, is(1));
+ assertEquals(1, errors);
} finally {
feeder.stop();
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
index 1d70ce953e4..6907e24009b 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
@@ -21,7 +21,11 @@ import static com.yahoo.vespa.http.client.TestUtils.getResults;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
*
@@ -79,16 +83,15 @@ public class V3HttpAPITest {
writeDocument(session);
Map<String, Result> results = getResults(session, 1);
- assertThat(results.size(), is(1));
+ assertEquals(1, results.size());
TestDocument document = documents.get(0);
Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- if (conditionNotMet) {
- assertThat(r.getDetails().iterator().next().getResultType(), is(Result.ResultType.CONDITION_NOT_MET));
- }
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(results.isEmpty(), is(true));
+ assertNotNull(r);
+ if (conditionNotMet)
+ assertEquals(Result.ResultType.CONDITION_NOT_MET, r.getDetails().iterator().next().getResultType());
+ assertFalse(r.getDetails().toString(), r.isSuccess());
+ assertTrue(results.isEmpty());
}
}
@@ -99,14 +102,14 @@ public class V3HttpAPITest {
writeDocuments(session);
Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
+ assertEquals(documents.size(), results.size());
for (TestDocument document : documents) {
Result r = results.remove(document.getDocumentId());
assertThat(r, not(nullValue()));
assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
}
- assertThat(results.isEmpty(), is(true));
+ assertTrue(results.isEmpty());
}
}
@@ -169,15 +172,15 @@ public class V3HttpAPITest {
writeDocuments(session);
Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
+ assertEquals(documents.size(), results.size());
for (TestDocument document : documents) {
Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().iterator().next().getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
+ assertNotNull(r);
+ assertFalse(r.getDetails().toString(), r.isSuccess());
+ assertEquals(Result.ResultType.TRANSITIVE_ERROR, r.getDetails().iterator().next().getResultType());
}
- assertThat(results.isEmpty(), is(true));
+ assertTrue(results.isEmpty());
}
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
index 494f901d8d7..4169e7e0ecf 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
@@ -76,7 +76,7 @@ public class ApacheGatewayConnectionTest {
apacheGatewayConnection.handshake();
documents.add(createDoc(docId, vespaDocContent, true));
- apacheGatewayConnection.writeOperations(documents);
+ apacheGatewayConnection.write(documents);
}
@Test(expected=IllegalArgumentException.class)
@@ -100,7 +100,7 @@ public class ApacheGatewayConnectionTest {
"clientId");
apacheGatewayConnection.connect();
final List<Document> documents = new ArrayList<>();
- apacheGatewayConnection.writeOperations(documents);
+ apacheGatewayConnection.write(documents);
}
@Test
@@ -145,7 +145,7 @@ public class ApacheGatewayConnectionTest {
documents.add(createDoc(docId, vespaDocContent, true));
- apacheGatewayConnection.writeOperations(documents);
+ apacheGatewayConnection.write(documents);
}
@Test
@@ -210,7 +210,7 @@ public class ApacheGatewayConnectionTest {
documents.add(doc);
- apacheGatewayConnection.writeOperations(documents);
+ apacheGatewayConnection.write(documents);
}
@Test
@@ -248,8 +248,8 @@ public class ApacheGatewayConnectionTest {
List<Document> documents = new ArrayList<>();
documents.add(createDoc("42", "content", true));
- apacheGatewayConnection.writeOperations(documents);
- apacheGatewayConnection.writeOperations(documents);
+ apacheGatewayConnection.write(documents);
+ apacheGatewayConnection.write(documents);
verify(headerProvider, times(3)).getHeaderValue(); // 1x connect(), 2x writeOperations()
}
@@ -274,7 +274,7 @@ public class ApacheGatewayConnectionTest {
apacheGatewayConnection.connect();
apacheGatewayConnection.handshake();
- apacheGatewayConnection.writeOperations(Collections.singletonList(createDoc("42", "content", true)));
+ apacheGatewayConnection.write(Collections.singletonList(createDoc("42", "content", true)));
}
private static ApacheGatewayConnection.HttpClientFactory mockHttpClientFactory(HttpExecuteMock httpExecuteMock) throws IOException {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index b313daa12cd..75df8a78b86 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -16,6 +16,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -90,9 +91,10 @@ public class IOThreadTest {
0,
0,
maxInFlightRequests,
- localQueueTimeOut,
+ Duration.ofMillis(localQueueTimeOut),
documentQueue,
0,
+ Duration.ofSeconds(15),
10);
}
@@ -101,7 +103,7 @@ public class IOThreadTest {
when(apacheGatewayConnection.connect()).thenReturn(true);
InputStream serverResponse = new ByteArrayInputStream(
(docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.writeOperations(any())).thenReturn(serverResponse);
+ when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
try (IOThread ioThread = createIOThread(10000, 10000)) {
ioThread.post(doc1);
@@ -112,7 +114,7 @@ public class IOThreadTest {
@Test
public void requireThatSingleDocumentWriteErrorIsHandledProperly() throws Exception {
when(apacheGatewayConnection.connect()).thenReturn(true);
- when(apacheGatewayConnection.writeOperations(any())).thenThrow(new IOException(exceptionMessage));
+ when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage));
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
try (IOThread ioThread = createIOThread(10000, 10000)) {
ioThread.post(doc1);
@@ -125,7 +127,7 @@ public class IOThreadTest {
when(apacheGatewayConnection.connect()).thenReturn(true);
InputStream serverResponse = new ByteArrayInputStream(
(docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.writeOperations(any()))
+ when(apacheGatewayConnection.write(any()))
.thenThrow(new IOException(exceptionMessage))
.thenReturn(serverResponse);
latch = new CountDownLatch(2);
@@ -143,7 +145,7 @@ public class IOThreadTest {
when(apacheGatewayConnection.connect()).thenReturn(false);
InputStream serverResponse = new ByteArrayInputStream(
("").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.writeOperations(any()))
+ when(apacheGatewayConnection.write(any()))
.thenReturn(serverResponse);
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
"java.lang.Exception: Not sending document operation, timed out in queue after");