diff options
Diffstat (limited to 'vespa-http-client/src/main/java/com/yahoo/vespa/http')
8 files changed, 206 insertions, 103 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java index d623db3834c..200bedb90da 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java @@ -172,6 +172,10 @@ public final class FeedParams { return this; } + /** + * Sets the number of milliseconds until we respond with a timeout for a document operation + * if we still have not received a response. + */ public Builder setLocalQueueTimeOut(long timeOutMs) { this.localQueueTimeOut = timeOutMs; return this; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java index bc38155d07a..a08064c8ce5 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java @@ -10,6 +10,8 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ThreadLocalRandom; /** + * A document operation + * * @author Einar M R Rosenvinge */ final public class Document { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index c5864e48681..ee801270c89 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -65,6 +65,7 @@ class ApacheGatewayConnection implements GatewayConnection { private final ConnectionParams connectionParams; private HttpClient httpClient; private Instant connectionTime = null; + private Instant lastPollTime = null; private String sessionId; private final String clientId; private int negotiatedVersion = -1; @@ -96,11 +97,20 @@ class ApacheGatewayConnection implements GatewayConnection { } @Override - public InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException { + public InputStream write(List<Document> docs) throws ServerResponseException, IOException { return write(docs, false, connectionParams.getUseCompression()); } @Override + public InputStream poll() throws ServerResponseException, IOException { + lastPollTime = Clock.systemUTC().instant(); + return write(Collections.<Document>emptyList(), false, false); + } + + @Override + public Instant lastPollTime() { return lastPollTime; } + + @Override public InputStream drain() throws ServerResponseException, IOException { return write(Collections.<Document>emptyList(), true, false); } @@ -185,11 +195,7 @@ class ApacheGatewayConnection implements GatewayConnection { httpPost.setHeader(Headers.CLIENT_ID, clientId); } httpPost.setHeader(Headers.SHARDING_KEY, shardingKey); - if (drain) { - httpPost.setHeader(Headers.DRAIN, "true"); - } else { - httpPost.setHeader(Headers.DRAIN, "false"); - } + httpPost.setHeader(Headers.DRAIN, drain ? "true" : "false"); if (clusterSpecificRoute != null) { httpPost.setHeader(Headers.ROUTE, feedParams.getRoute()); } else { @@ -257,18 +263,14 @@ class ApacheGatewayConnection implements GatewayConnection { private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException { StatusLine statusLine = response.getStatusLine(); + int statusCode = statusLine.getStatusCode(); + // We use code 261-299 to report errors related to internal transitive errors that the tenants should not care // about to avoid masking more serious errors. - int statusCode = statusLine.getStatusCode(); - if (statusCode > 199 && statusCode < 260) { - return; - } - if (statusCode == 299) { - throw new ServerResponseException(429, "Too many requests."); - } - String message = tryGetDetailedErrorMessage(response) - .orElseGet(statusLine::getReasonPhrase); - throw new ServerResponseException(statusLine.getStatusCode(), message); + if (statusCode > 199 && statusCode < 260) return; + if (statusCode == 299) throw new ServerResponseException(429, "Too many requests."); + throw new ServerResponseException(statusCode, + tryGetDetailedErrorMessage(response).orElseGet(statusLine::getReasonPhrase)); } private static Optional<String> tryGetDetailedErrorMessage(HttpResponse response) { @@ -292,7 +294,7 @@ class ApacheGatewayConnection implements GatewayConnection { if (negotiatedVersion == 3) { if (clientId == null || !clientId.equals(serverHeaderVal)) { String message = "Running using v3. However, server responds with different session " + - "than client has set; " + serverHeaderVal + " vs client code " + clientId; + "than client has set; " + serverHeaderVal + " vs client code " + clientId; log.severe(message); throw new ServerResponseException(message); } @@ -301,14 +303,12 @@ class ApacheGatewayConnection implements GatewayConnection { if (sessionId == null) { //this must be the first request log.finer("Got session ID from server: " + serverHeaderVal); this.sessionId = serverHeaderVal; - return; } else { if (!sessionId.equals(serverHeaderVal)) { - log.info("Request has been routed to a server which does not recognize the client session." - + " Most likely cause is upgrading of cluster, transitive error."); - throw new ServerResponseException( - "Session ID received from server ('" + serverHeaderVal - + "') does not match cached session ID ('" + sessionId + "')"); + log.info("Request has been routed to a server which does not recognize the client session." + + " Most likely cause is upgrading of cluster, transitive error."); + throw new ServerResponseException("Session ID received from server ('" + serverHeaderVal + + "') does not match cached session ID ('" + sessionId + "')"); } } } @@ -415,7 +415,6 @@ class ApacheGatewayConnection implements GatewayConnection { } clientBuilder.setMaxConnPerRoute(1); clientBuilder.setMaxConnTotal(1); - clientBuilder.setConnectionTimeToLive(connectionParams.getConnectionTimeToLive().getSeconds(), TimeUnit.SECONDS); clientBuilder.setUserAgent(String.format("vespa-http-client (%s)", Vtag.currentVersion)); clientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader(Headers.CLIENT_VERSION, Vtag.currentVersion))); clientBuilder.disableContentCompression(); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index c50f9420973..319a925611a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -14,6 +14,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.io.IOException; import java.io.StringWriter; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -85,9 +86,10 @@ public class ClusterConnection implements AutoCloseable { clusterId, feedParams.getMaxChunkSizeBytes(), maxInFlightPerSession, - feedParams.getLocalQueueTimeOut(), + Duration.ofMillis(feedParams.getLocalQueueTimeOut()), documentQueue, feedParams.getMaxSleepTimeMs(), + connectionParams.getConnectionTimeToLive(), idlePollFrequency); ioThreads.add(ioThread); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java index f6b3d1fb62a..ce867d001aa 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.core.Document; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -106,13 +107,13 @@ class DocumentQueue { return previousState; } - Optional<Document> pollDocumentIfTimedoutInQueue(long localQueueTimeOut) { + Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) { synchronized (queue) { if (queue.isEmpty()) { return Optional.empty(); } Document document = queue.peek(); - if (document.timeInQueueMillis() > localQueueTimeOut) { + if (document.timeInQueueMillis() > localQueueTimeOut.toMillis()) { return Optional.of(queue.poll()); } return Optional.empty(); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index f91a853c52c..02df8c52878 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -5,10 +5,8 @@ import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.ErrorCode; import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespa.http.client.core.ServerResponseException; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -25,13 +23,14 @@ public class DryRunGatewayConnection implements GatewayConnection { private final Endpoint endpoint; private Instant connectionTime = null; + private Instant lastPollTime = null; public DryRunGatewayConnection(Endpoint endpoint) { this.endpoint = endpoint; } @Override - public InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException { + public InputStream write(List<Document> docs) { StringBuilder result = new StringBuilder(); for (Document doc : docs) { OperationStatus operationStatus = new OperationStatus("ok", doc.getOperationId(), ErrorCode.OK, false, ""); @@ -41,8 +40,17 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream drain() throws ServerResponseException, IOException { - return writeOperations(new ArrayList<Document>()); + public InputStream poll() { + lastPollTime = Clock.systemUTC().instant(); + return write(new ArrayList<>()); + } + + @Override + public Instant lastPollTime() { return lastPollTime; } + + @Override + public InputStream drain() { + return write(new ArrayList<>()); } @Override @@ -60,7 +68,7 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public void handshake() throws ServerResponseException, IOException { } + public void handshake() { } @Override public void close() { } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java index 1b205d8ee41..ce1edb83fa2 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java @@ -14,8 +14,15 @@ public interface GatewayConnection { /** Returns the time this connected over the network, or null if not connected yet */ Instant connectionTime(); - InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException; + /** Returns the last time poll was called on this, or null if never */ + Instant lastPollTime(); + InputStream write(List<Document> docs) throws ServerResponseException, IOException; + + /** Returns any operation results that are ready now */ + InputStream poll() throws ServerResponseException, IOException; + + /** Attempt to drain all outstanding operations, even if this leads to blocking */ InputStream drain() throws ServerResponseException, IOException; boolean connect(); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 9aad633bd7b..d1bb0ad9a4f 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -13,8 +13,12 @@ import com.yahoo.vespa.http.client.core.ServerResponseException; import java.io.IOException; import java.io.InputStream; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Random; @@ -33,7 +37,7 @@ class IOThread implements Runnable, AutoCloseable { private static final Logger log = Logger.getLogger(IOThread.class.getName()); private final Endpoint endpoint; - private final GatewayConnection currentConnection; + private final GatewayConnectionFactory connectionFactory; private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; private final Thread thread; @@ -42,12 +46,24 @@ class IOThread implements Runnable, AutoCloseable { private final CountDownLatch stopSignal = new CountDownLatch(1); private final int maxChunkSizeBytes; private final int maxInFlightRequests; - private final long localQueueTimeOut; + private final Duration localQueueTimeOut; + private final Duration maxOldConnectionPollInterval; private final GatewayThrottler gatewayThrottler; + private final Duration connectionTimeToLive; private final long pollIntervalUS; private final Random random = new Random(); - private enum ThreadState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; + private GatewayConnection currentConnection; + + /** + * Previous connections on which we have sent operations and are still waiting for the result + * (so all connections in this are in state SESSION_SYNCED). + * We need to drain results on the connection where they were sent to make sure we request results on + * the node which received the operation also when going through a VIP. + */ + private final List<GatewayConnection> oldConnections = new ArrayList<>(); + + private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0); private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0); @@ -65,22 +81,26 @@ class IOThread implements Runnable, AutoCloseable { int clusterId, int maxChunkSizeBytes, int maxInFlightRequests, - long localQueueTimeOut, + Duration localQueueTimeOut, DocumentQueue documentQueue, long maxSleepTimeMs, + Duration connectionTimeToLive, double idlePollFrequency) { this.endpoint = endpoint; this.documentQueue = documentQueue; + this.connectionFactory = connectionFactory; this.currentConnection = connectionFactory.newConnection(); this.resultQueue = endpointResultQueue; this.clusterId = clusterId; this.maxChunkSizeBytes = maxChunkSizeBytes; this.maxInFlightRequests = maxInFlightRequests; + this.connectionTimeToLive = connectionTimeToLive; this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; + this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10); thread.start(); } @@ -88,41 +108,6 @@ class IOThread implements Runnable, AutoCloseable { return endpoint; } - public static class ConnectionStats { - - // NOTE: These fields are accessed by reflection in JSON serialization - - public final int wrongSessionDetectedCounter; - public final int wrongVersionDetectedCounter; - public final int problemStatusCodeFromServerCounter; - public final int executeProblemsCounter; - public final int docsReceivedCounter; - public final int statusReceivedCounter; - public final int pendingDocumentStatusCount; - public final int successfullHandshakes; - public final int lastGatewayProcessTimeMillis; - - ConnectionStats(int wrongSessionDetectedCounter, - int wrongVersionDetectedCounter, - int problemStatusCodeFromServerCounter, - int executeProblemsCounter, - int docsReceivedCounter, - int statusReceivedCounter, - int pendingDocumentStatusCount, - int successfullHandshakes, - int lastGatewayProcessTimeMillis) { - this.wrongSessionDetectedCounter = wrongSessionDetectedCounter; - this.wrongVersionDetectedCounter = wrongVersionDetectedCounter; - this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter; - this.executeProblemsCounter = executeProblemsCounter; - this.docsReceivedCounter = docsReceivedCounter; - this.statusReceivedCounter = statusReceivedCounter; - this.pendingDocumentStatusCount = pendingDocumentStatusCount; - this.successfullHandshakes = successfullHandshakes; - this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis; - } - } - /** * Returns a snapshot of counters. Threadsafe. */ @@ -236,12 +221,12 @@ class IOThread implements Runnable, AutoCloseable { private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException { try { // Post the new docs and get async responses for other posts. - return currentConnection.writeOperations(docs); + return currentConnection.write(docs); } catch (ServerResponseException ser) { markDocumentAsFailed(docs, ser); throw ser; } catch (Exception e) { - markDocumentAsFailed(docs, new ServerResponseException(e.getMessage())); + markDocumentAsFailed(docs, new ServerResponseException(Exceptions.toMessageString(e))); throw e; } } @@ -309,27 +294,29 @@ class IOThread implements Runnable, AutoCloseable { return processResponse; } - /** Given a current thread state, take the appropriate action and return the resulting new thread state */ - private ThreadState cycle(ThreadState threadState) { - switch(threadState) { + /** Given a current connection state, take the appropriate action and return the resulting new connection state */ + private ConnectionState cycle(ConnectionState connectionState) { + switch(connectionState) { case DISCONNECTED: try { if (! currentConnection.connect()) { log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try."); drainFirstDocumentsInQueueIfOld(); - return ThreadState.DISCONNECTED; + return ConnectionState.DISCONNECTED; } - return ThreadState.CONNECTED; + return ConnectionState.CONNECTED; } catch (Throwable throwable1) { drainFirstDocumentsInQueueIfOld(); log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1); executeProblemsCounter.incrementAndGet(); - return ThreadState.DISCONNECTED; + return ConnectionState.DISCONNECTED; } case CONNECTED: try { + if (isStale(currentConnection)) + return refreshConnection(connectionState); currentConnection.handshake(); successfulHandshakes.getAndIncrement(); } catch (ServerResponseException ser) { @@ -340,7 +327,7 @@ class IOThread implements Runnable, AutoCloseable { drainFirstDocumentsInQueueIfOld(); resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint)); - return ThreadState.CONNECTED; + return ConnectionState.CONNECTED; } catch (Throwable throwable) { // This cover IOException as well executeProblemsCounter.incrementAndGet(); resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint)); @@ -348,38 +335,53 @@ class IOThread implements Runnable, AutoCloseable { + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable); drainFirstDocumentsInQueueIfOld(); currentConnection.close(); - return ThreadState.DISCONNECTED; + return ConnectionState.DISCONNECTED; } - return ThreadState.SESSION_SYNCED; + return ConnectionState.SESSION_SYNCED; case SESSION_SYNCED: try { + if (isStale(currentConnection)) + return refreshConnection(connectionState); ProcessResponse processResponse = pullAndProcessData(pollIntervalUS); gatewayThrottler.handleCall(processResponse.transitiveErrorCount); } catch (ServerResponseException ser) { - log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint - + "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '" - + Exceptions.toMessageString(ser) + "'",ser); - return ThreadState.CONNECTED; + log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + + "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '" + + Exceptions.toMessageString(ser) + "'",ser); + return ConnectionState.CONNECTED; } - catch (Throwable e) { // Covers IOException as well - log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint - + "'. Will re-try. Connection level error. Failed with '" + Exceptions.toMessageString(e) + "'", e); + catch (Throwable e) { + log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + + "'. Will re-try. Connection level error. Failed with '" + + Exceptions.toMessageString(e) + "'", e); currentConnection.close(); - return ThreadState.DISCONNECTED; + return ConnectionState.DISCONNECTED; } - return ThreadState.SESSION_SYNCED; + return ConnectionState.SESSION_SYNCED; default: { log.severe("Should never get here."); currentConnection.close(); - return ThreadState.DISCONNECTED; + return ConnectionState.DISCONNECTED; } } } - private void sleepIfProblemsGettingSyncedConnection(ThreadState newState, ThreadState oldState) { - if (newState == ThreadState.SESSION_SYNCED) return; - if (newState == ThreadState.CONNECTED && oldState == ThreadState.DISCONNECTED) return; + private boolean isStale(GatewayConnection connection) { + return connection.connectionTime() != null + && connection.connectionTime().plus(connectionTimeToLive).isBefore(Clock.systemUTC().instant()); + } + + private ConnectionState refreshConnection(ConnectionState currentConnectionState) { + if (currentConnectionState == ConnectionState.SESSION_SYNCED) + oldConnections.add(currentConnection); + currentConnection = connectionFactory.newConnection(); + return ConnectionState.DISCONNECTED; + } + + private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) { + if (newState == ConnectionState.SESSION_SYNCED) return; + if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return; try { // Take it easy we have problems getting a connection up. if (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) { @@ -391,11 +393,12 @@ class IOThread implements Runnable, AutoCloseable { @Override public void run() { - ThreadState threadState = ThreadState.DISCONNECTED; + ConnectionState connectionState = ConnectionState.DISCONNECTED; while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) { - ThreadState oldState = threadState; - threadState = cycle(threadState); - sleepIfProblemsGettingSyncedConnection(threadState, oldState); + ConnectionState oldState = connectionState; + connectionState = cycle(connectionState); + checkOldConnections(); + sleepIfProblemsGettingSyncedConnection(connectionState, oldState); } log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size()); @@ -410,8 +413,8 @@ class IOThread implements Runnable, AutoCloseable { EndpointResult endpointResult = EndPointResultFactory.createTransientError( endpoint, document.get().getOperationId(), - new Exception("Not sending document operation, timed out in queue after " - + document.get().timeInQueueMillis() + " ms.")); + new Exception("Not sending document operation, timed out in queue after " + + document.get().timeInQueueMillis() + " ms.")); resultQueue.failOperation(endpointResult, clusterId); } } @@ -427,4 +430,81 @@ class IOThread implements Runnable, AutoCloseable { } } + private void checkOldConnections() { + if (resultQueue.getPendingSize() == 0) { + oldConnections.forEach(c -> c.close()); + oldConnections.clear(); + return; + } + + for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { + GatewayConnection connection = i.next(); + if (closingTime(connection).isBefore(Clock.systemUTC().instant())) { + connection.close(); + i.remove();; + } + else if (timeToPoll(connection)) { + try { + processResponse(connection.poll()); + } + catch (Exception e) { + // Old connection; this may be ok + } + } + + } + } + + private Instant closingTime(GatewayConnection connection) { + return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut); + } + + private boolean timeToPoll(GatewayConnection connection) { + if (connection.lastPollTime() == null) return true; + + // Poll less the closer the connection comes to closing time + double newness = ( closingTime(connection).toEpochMilli() - Clock.systemUTC().instant().toEpochMilli() ) / + (double)localQueueTimeOut.toMillis(); + if (newness < 0) return true; // connection retired prematurely + if (newness > 1) return false; // closing time reached + Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 + + (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000))); + return connection.lastPollTime().plus(pollInterval).isBefore(Clock.systemUTC().instant()); + } + + public static class ConnectionStats { + + // NOTE: These fields are accessed by reflection in JSON serialization + + public final int wrongSessionDetectedCounter; + public final int wrongVersionDetectedCounter; + public final int problemStatusCodeFromServerCounter; + public final int executeProblemsCounter; + public final int docsReceivedCounter; + public final int statusReceivedCounter; + public final int pendingDocumentStatusCount; + public final int successfullHandshakes; + public final int lastGatewayProcessTimeMillis; + + ConnectionStats(int wrongSessionDetectedCounter, + int wrongVersionDetectedCounter, + int problemStatusCodeFromServerCounter, + int executeProblemsCounter, + int docsReceivedCounter, + int statusReceivedCounter, + int pendingDocumentStatusCount, + int successfullHandshakes, + int lastGatewayProcessTimeMillis) { + this.wrongSessionDetectedCounter = wrongSessionDetectedCounter; + this.wrongVersionDetectedCounter = wrongVersionDetectedCounter; + this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter; + this.executeProblemsCounter = executeProblemsCounter; + this.docsReceivedCounter = docsReceivedCounter; + this.statusReceivedCounter = statusReceivedCounter; + this.pendingDocumentStatusCount = pendingDocumentStatusCount; + this.successfullHandshakes = successfullHandshakes; + this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis; + } + } + } |