diff options
Diffstat (limited to 'vespa-http-client')
7 files changed, 154 insertions, 125 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java index 2417a4acf71..733e5bca424 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java @@ -47,7 +47,7 @@ public final class ConnectionParams { private int traceEveryXOperation = 0; private boolean printTraceToStdErr = true; private boolean useTlsConfigFromEnvironment = false; - private Duration connectionTimeToLive = Duration.ofSeconds(15); + private Duration connectionTimeToLive = Duration.ofSeconds(30); private Path privateKey; private Path certificate; private Path caCertificates; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index 16dd90ed7ac..f69bdc2a91d 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -23,6 +23,7 @@ import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.message.BasicHeader; +import org.apache.http.util.EntityUtils; import javax.net.ssl.SSLContext; import java.io.ByteArrayInputStream; @@ -246,24 +247,21 @@ class ApacheGatewayConnection implements GatewayConnection { } private InputStream executePost(HttpPost httpPost) throws ServerResponseException, IOException { - HttpResponse response; - try { - if (httpClient == null) - throw new IOException("Trying to executePost while not having a connection/http client"); - response = httpClient.execute(httpPost); - } catch (Exception e) { - httpPost.abort(); - throw e; - } + if (httpClient == null) + throw new IOException("Trying to executePost while not having a connection/http client"); + HttpResponse response = httpClient.execute(httpPost); try { verifyServerResponseCode(response); verifyServerVersion(response.getFirstHeader(Headers.VERSION)); verifySessionHeader(response.getFirstHeader(Headers.SESSION_ID)); } catch (ServerResponseException e) { - httpPost.abort(); + // Ensure response is consumed to allow connection reuse later on + EntityUtils.consumeQuietly(response.getEntity()); throw e; } - return response.getEntity().getContent(); + // Consume response now to allow connection to be reused immediately + byte[] responseData = EntityUtils.toByteArray(response.getEntity()); + return responseData == null ? null : new ByteArrayInputStream(responseData); } private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 623ea543ffb..9ef3e5eee15 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -45,7 +45,7 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream write(List<Document> docs) throws IOException { + public synchronized InputStream write(List<Document> docs) throws IOException { if (throwThisOnWrite != null) throw throwThisOnWrite; @@ -65,27 +65,27 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream poll() throws IOException { + public synchronized InputStream poll() throws IOException { lastPollTime = clock.instant(); return write(new ArrayList<>()); } @Override - public Instant lastPollTime() { return lastPollTime; } + public synchronized Instant lastPollTime() { return lastPollTime; } @Override - public InputStream drain() throws IOException { + public synchronized InputStream drain() throws IOException { return write(new ArrayList<>()); } @Override - public boolean connect() { + public synchronized boolean connect() { connectionTime = clock.instant(); return true; } @Override - public Instant connectionTime() { return connectionTime; } + public synchronized Instant connectionTime() { return connectionTime; } @Override public Endpoint getEndpoint() { @@ -93,26 +93,26 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public void handshake() throws ServerResponseException { + public synchronized void handshake() throws ServerResponseException { if (throwThisOnHandshake != null) throw throwThisOnHandshake; } @Override - public void close() { } + public synchronized void close() { } - public void hold(boolean hold) { + public synchronized void hold(boolean hold) { this.hold = hold; } /** Returns the document currently held in this */ - public List<Document> held() { return Collections.unmodifiableList(held); } + public synchronized List<Document> held() { return Collections.unmodifiableList(held); } - public void throwOnWrite(IOException throwThisOnWrite) { + public synchronized void throwOnWrite(IOException throwThisOnWrite) { this.throwThisOnWrite = throwThisOnWrite; } - public void throwOnHandshake(ServerResponseException throwThisOnHandshake) { + public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) { this.throwThisOnHandshake = throwThisOnHandshake; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index a5a37a31665..473dfd4b702 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -3,8 +3,8 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.FeedEndpointException; import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.util.HashMap; @@ -25,8 +25,7 @@ class EndpointResultQueue { private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName()); private final OperationProcessor operationProcessor; - /** The currently in flight operations */ - private final Map<String, TimerFuture> futureByOperation = new HashMap<>(); + private final Map<String, InflightOperation> inflightOperations = new HashMap<>(); private final Endpoint endpoint; private final int clusterId; @@ -45,10 +44,10 @@ class EndpointResultQueue { this.totalTimeoutMs = totalTimeoutMs; } - public synchronized void operationSent(String operationId) { + public synchronized void operationSent(String operationId, GatewayConnection connection) { DocumentTimerTask task = new DocumentTimerTask(operationId); ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS); - futureByOperation.put(operationId, new TimerFuture(future)); + inflightOperations.put(operationId, new InflightOperation(future, connection)); } public synchronized void failOperation(EndpointResult result, int clusterId) { @@ -65,8 +64,8 @@ class EndpointResultQueue { private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { operationProcessor.resultReceived(result, clusterId); - TimerFuture timerFuture = futureByOperation.remove(result.getOperationId()); - if (timerFuture == null) { + InflightOperation operation = inflightOperations.remove(result.getOperationId()); + if (operation == null) { if (duplicateGivesWarning) { log.warning("Result for ID '" + result.getOperationId() + "' received from '" + endpoint + "', but we have no record of a sent operation. Either something is wrong on the server side " + @@ -75,13 +74,13 @@ class EndpointResultQueue { } return; } - timerFuture.getFuture().cancel(false); + operation.future.cancel(false); } /** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */ private synchronized void timeout(String operationId) { - TimerFuture timerFuture = futureByOperation.remove(operationId); - if (timerFuture == null) { + InflightOperation operation = inflightOperations.remove(operationId); + if (operation == null) { log.finer("Timeout of operation '" + operationId + "', but operation " + "not found in map. Result was probably received just-in-time from server, while timeout " + "task could not be cancelled."); @@ -93,20 +92,21 @@ class EndpointResultQueue { } public synchronized int getPendingSize() { - return futureByOperation.values().size(); + return inflightOperations.values().size(); } public synchronized void failPending(Exception exception) { - for (Map.Entry<String, TimerFuture> timerFutureEntry : futureByOperation.entrySet()) { - timerFutureEntry.getValue().getFuture().cancel(false); - failedOperationId(timerFutureEntry.getKey(), exception); - } - futureByOperation.clear(); + inflightOperations.forEach((operationId, operation) -> { + operation.future.cancel(false); + EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception); + operationProcessor.resultReceived(result, clusterId); + }); + inflightOperations.clear(); } - private synchronized void failedOperationId(String operationId, Exception exception) { - EndpointResult endpointResult = EndPointResultFactory.createError(endpoint, operationId, exception); - operationProcessor.resultReceived(endpointResult, clusterId); + public synchronized boolean hasInflightOperations(GatewayConnection connection) { + return inflightOperations.entrySet().stream() + .anyMatch(entry -> entry.getValue().connection.equals(connection)); } private class DocumentTimerTask implements Runnable { @@ -124,18 +124,13 @@ class EndpointResultQueue { } - private static class TimerFuture { - - private final ScheduledFuture<?> future; + private static class InflightOperation { + final ScheduledFuture<?> future; + final GatewayConnection connection; - public TimerFuture(ScheduledFuture<?> future) { + InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) { this.future = future; + this.connection = connection; } - - private ScheduledFuture<?> getFuture() { - return future; - } - } - } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index aa914f852c3..4ceb10d4852 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -6,10 +6,10 @@ import com.yahoo.vespa.http.client.FeedProtocolException; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.Exceptions; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.Exceptions; import com.yahoo.vespa.http.client.core.ServerResponseException; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import java.io.IOException; import java.io.InputStream; @@ -58,8 +58,8 @@ public class IOThread implements Runnable, AutoCloseable { private final Random random = new Random(); private final OldConnectionsDrainer oldConnectionsDrainer; - private GatewayConnection currentConnection; - private ConnectionState connectionState = ConnectionState.DISCONNECTED; + private volatile GatewayConnection currentConnection; + private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); @@ -96,12 +96,12 @@ public class IOThread implements Runnable, AutoCloseable { this.maxInFlightRequests = maxInFlightRequests; this.connectionTimeToLive = connectionTimeToLive; this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); - this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] + this.pollIntervalUS = Math.max(1000, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1ms, 10s] this.clock = clock; this.localQueueTimeOut = localQueueTimeOut; this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint, clusterId, - pollIntervalUS, + Duration.ofMillis(pollIntervalUS/1000), connectionTimeToLive, localQueueTimeOut, statusReceivedCounter, @@ -224,7 +224,7 @@ public class IOThread implements Runnable, AutoCloseable { private void addDocumentsToResultQueue(List<Document> docs) { for (Document doc : docs) { - resultQueue.operationSent(doc.getOperationId()); + resultQueue.operationSent(doc.getOperationId(), currentConnection); } } @@ -517,9 +517,11 @@ public class IOThread implements Runnable, AutoCloseable { */ private static class OldConnectionsDrainer implements Runnable { + private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName()); + private final Endpoint endpoint; private final int clusterId; - private final long pollIntervalUS; + private final Duration pollInterval; private final Duration connectionTimeToLive; private final Duration localQueueTimeOut; private final AtomicInteger statusReceivedCounter; @@ -535,7 +537,7 @@ public class IOThread implements Runnable, AutoCloseable { OldConnectionsDrainer(Endpoint endpoint, int clusterId, - long pollIntervalUS, + Duration pollInterval, Duration connectionTimeToLive, Duration localQueueTimeOut, AtomicInteger statusReceivedCounter, @@ -544,7 +546,7 @@ public class IOThread implements Runnable, AutoCloseable { Clock clock) { this.endpoint = endpoint; this.clusterId = clusterId; - this.pollIntervalUS = pollIntervalUS; + this.pollInterval = pollInterval; this.connectionTimeToLive = connectionTimeToLive; this.localQueueTimeOut = localQueueTimeOut; this.statusReceivedCounter = statusReceivedCounter; @@ -561,55 +563,62 @@ public class IOThread implements Runnable, AutoCloseable { @Override public void run() { while (stopSignal.getCount() > 0) { - checkOldConnections(); try { - Thread.sleep(pollIntervalUS/1000); + checkOldConnections(); + Thread.sleep(pollInterval.toMillis()); } catch (InterruptedException e) { + log.log(Level.WARNING, "Close thread was interrupted: " + e.getMessage(), e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + log.log(Level.WARNING, "Connection draining failed: " + e.getMessage(), e); } } } public void checkOldConnections() { - List<GatewayConnection> toRemove = null; - try { - for (GatewayConnection connection : connections) { - if (closingTime(connection).isBefore(clock.instant())) { - try { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } finally { - connection.close(); - } - } catch (Exception e) { - // Old connection; best effort - } finally { - if (toRemove == null) - toRemove = new ArrayList<>(1); - toRemove.add(connection); - } - } else if (timeToPoll(connection)) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Exception e) { - // Old connection; best effort - } - } + for (GatewayConnection connection : connections) { + if (!resultQueue.hasInflightOperations(connection)) { + log.fine(() -> connection + " no longer has inflight operations"); + closeConnection(connection); + } else if (closingTime(connection).isBefore(clock.instant())) { + log.fine(() -> connection + " still has inflight operations, but drain period is over"); + tryPollAndDrainInflightOperations(connection); + closeConnection(connection); + } else if (timeToPoll(connection)) { + tryPollAndDrainInflightOperations(connection); } - } finally { - if (toRemove != null) - connections.removeAll(toRemove); + } + } + private void closeConnection(GatewayConnection connection) { + log.fine(() -> "Closing " + connection); + connection.close(); + connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration + } + + private void tryPollAndDrainInflightOperations(GatewayConnection connection) { + try { + log.fine(() -> "Polling and draining inflight operations for " + connection); + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } catch (Exception e) { + // Old connection; best effort + log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage()); } } private boolean timeToPoll(GatewayConnection connection) { - if (connection.lastPollTime() == null) return true; + // connectionEndOfLife < connectionLastPolled < now + Instant now = clock.instant(); + Instant endOfLife = connection.connectionTime().plus(connectionTimeToLive); + if (connection.lastPollTime() == null) return endOfLife.plus(pollInterval).isBefore(now); + if (connection.lastPollTime().plus(pollInterval).isAfter(now)) return false; // Exponential (2^x) dropoff: - double connectionEndOfLife = connection.connectionTime().plus(connectionTimeToLive).toEpochMilli(); + double connectionEndOfLife = endOfLife.toEpochMilli(); double connectionLastPolled = connection.lastPollTime().toEpochMilli(); - return clock.millis() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife); + return now.toEpochMilli() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife); } private Instant closingTime(GatewayConnection connection) { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java index da82079e992..55961e4aa0e 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java @@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import org.junit.Test; +import java.time.Clock; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ public class EndpointResultQueueTest { public void testBasics() { Endpoint endpoint = Endpoint.create("a"); + GatewayConnection connection = new DryRunGatewayConnection(endpoint, Clock.systemUTC()); OperationProcessor mockAggregator = mock(OperationProcessor.class); final AtomicInteger resultCount = new AtomicInteger(0); @@ -39,11 +41,11 @@ public class EndpointResultQueueTest { EndpointResultQueue q = new EndpointResultQueue( mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L * 1000L); - q.operationSent("op1"); + q.operationSent("op1", connection); assertThat(q.getPendingSize(), is(1)); - q.operationSent("op2"); + q.operationSent("op2", connection); assertThat(q.getPendingSize(), is(2)); - q.operationSent("op3"); + q.operationSent("op3", connection); assertThat(q.getPendingSize(), is(3)); q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0); assertThat(q.getPendingSize(), is(2)); @@ -58,9 +60,9 @@ public class EndpointResultQueueTest { assertThat(resultCount.get(), is(5)); - q.operationSent("op4"); + q.operationSent("op4", connection); assertThat(q.getPendingSize(), is(1)); - q.operationSent("op5"); + q.operationSent("op5", connection); assertThat(q.getPendingSize(), is(2)); q.failPending(new RuntimeException()); @@ -72,7 +74,6 @@ public class EndpointResultQueueTest { @Test public void testTimeout() throws InterruptedException { Endpoint endpoint = Endpoint.create("a"); - OperationProcessor mockAggregator = mock(OperationProcessor.class); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { @@ -81,7 +82,7 @@ public class EndpointResultQueueTest { }).when(mockAggregator).resultReceived(any(), eq(0)); EndpointResultQueue q = new EndpointResultQueue( mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L); - q.operationSent("1234"); + q.operationSent("1234", new DryRunGatewayConnection(endpoint, Clock.systemUTC())); assert(latch.await(120, TimeUnit.SECONDS)); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 240adc29197..bddfecdfe65 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -101,42 +101,68 @@ public class IOThreadTest { tester.send("doc1"); tester.tick(1); - tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15 + tester.clock().advance(Duration.ofSeconds(31)); // Default connection ttl is 30 tester.tick(3); assertEquals(1, ioThread.oldConnections().size()); assertEquals(firstConnection, ioThread.oldConnections().get(0)); assertNotSame(firstConnection, ioThread.currentConnection()); - assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000); + assertEquals(31, firstConnection.lastPollTime().toEpochMilli() / 1000); // Check old connection poll pattern (exponential backoff) - assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(31, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester); tester.clock().advance(Duration.ofSeconds(200)); tester.tick(1); assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); } + @Test + public void old_connections_are_closed_early_if_no_inflight_operations() { + OperationProcessorTester tester = new OperationProcessorTester(); + tester.tick(3); + + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + assertEquals(0, ioThread.oldConnections().size()); + + firstConnection.hold(true); // do not send result for doc1 in next http response + tester.send("doc1"); + tester.tick(1); + tester.clock().advance(Duration.ofSeconds(31)); // Default connection TTL + 1 + tester.tick(1); + assertEquals(1, ioThread.oldConnections().size()); + + firstConnection.hold(false); // send result for both doc1 and doc2 in next http response + tester.send("doc2"); + tester.tick(1); + assertEquals(1, ioThread.oldConnections().size()); + tester.clock().advance(Duration.ofSeconds(2)); + tester.tick(3); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertEquals(0, ioThread.oldConnections().size()); + } + private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, int advanceSeconds, DryRunGatewayConnection connection, |