aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java20
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java24
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java53
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java93
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java15
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java72
7 files changed, 154 insertions, 125 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
index 2417a4acf71..733e5bca424 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
@@ -47,7 +47,7 @@ public final class ConnectionParams {
private int traceEveryXOperation = 0;
private boolean printTraceToStdErr = true;
private boolean useTlsConfigFromEnvironment = false;
- private Duration connectionTimeToLive = Duration.ofSeconds(15);
+ private Duration connectionTimeToLive = Duration.ofSeconds(30);
private Path privateKey;
private Path certificate;
private Path caCertificates;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
index 16dd90ed7ac..f69bdc2a91d 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
@@ -23,6 +23,7 @@ import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
+import org.apache.http.util.EntityUtils;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
@@ -246,24 +247,21 @@ class ApacheGatewayConnection implements GatewayConnection {
}
private InputStream executePost(HttpPost httpPost) throws ServerResponseException, IOException {
- HttpResponse response;
- try {
- if (httpClient == null)
- throw new IOException("Trying to executePost while not having a connection/http client");
- response = httpClient.execute(httpPost);
- } catch (Exception e) {
- httpPost.abort();
- throw e;
- }
+ if (httpClient == null)
+ throw new IOException("Trying to executePost while not having a connection/http client");
+ HttpResponse response = httpClient.execute(httpPost);
try {
verifyServerResponseCode(response);
verifyServerVersion(response.getFirstHeader(Headers.VERSION));
verifySessionHeader(response.getFirstHeader(Headers.SESSION_ID));
} catch (ServerResponseException e) {
- httpPost.abort();
+ // Ensure response is consumed to allow connection reuse later on
+ EntityUtils.consumeQuietly(response.getEntity());
throw e;
}
- return response.getEntity().getContent();
+ // Consume response now to allow connection to be reused immediately
+ byte[] responseData = EntityUtils.toByteArray(response.getEntity());
+ return responseData == null ? null : new ByteArrayInputStream(responseData);
}
private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 623ea543ffb..9ef3e5eee15 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -45,7 +45,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream write(List<Document> docs) throws IOException {
+ public synchronized InputStream write(List<Document> docs) throws IOException {
if (throwThisOnWrite != null)
throw throwThisOnWrite;
@@ -65,27 +65,27 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream poll() throws IOException {
+ public synchronized InputStream poll() throws IOException {
lastPollTime = clock.instant();
return write(new ArrayList<>());
}
@Override
- public Instant lastPollTime() { return lastPollTime; }
+ public synchronized Instant lastPollTime() { return lastPollTime; }
@Override
- public InputStream drain() throws IOException {
+ public synchronized InputStream drain() throws IOException {
return write(new ArrayList<>());
}
@Override
- public boolean connect() {
+ public synchronized boolean connect() {
connectionTime = clock.instant();
return true;
}
@Override
- public Instant connectionTime() { return connectionTime; }
+ public synchronized Instant connectionTime() { return connectionTime; }
@Override
public Endpoint getEndpoint() {
@@ -93,26 +93,26 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public void handshake() throws ServerResponseException {
+ public synchronized void handshake() throws ServerResponseException {
if (throwThisOnHandshake != null)
throw throwThisOnHandshake;
}
@Override
- public void close() { }
+ public synchronized void close() { }
- public void hold(boolean hold) {
+ public synchronized void hold(boolean hold) {
this.hold = hold;
}
/** Returns the document currently held in this */
- public List<Document> held() { return Collections.unmodifiableList(held); }
+ public synchronized List<Document> held() { return Collections.unmodifiableList(held); }
- public void throwOnWrite(IOException throwThisOnWrite) {
+ public synchronized void throwOnWrite(IOException throwThisOnWrite) {
this.throwThisOnWrite = throwThisOnWrite;
}
- public void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
+ public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
this.throwThisOnHandshake = throwThisOnHandshake;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index a5a37a31665..473dfd4b702 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -3,8 +3,8 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.util.HashMap;
@@ -25,8 +25,7 @@ class EndpointResultQueue {
private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName());
private final OperationProcessor operationProcessor;
- /** The currently in flight operations */
- private final Map<String, TimerFuture> futureByOperation = new HashMap<>();
+ private final Map<String, InflightOperation> inflightOperations = new HashMap<>();
private final Endpoint endpoint;
private final int clusterId;
@@ -45,10 +44,10 @@ class EndpointResultQueue {
this.totalTimeoutMs = totalTimeoutMs;
}
- public synchronized void operationSent(String operationId) {
+ public synchronized void operationSent(String operationId, GatewayConnection connection) {
DocumentTimerTask task = new DocumentTimerTask(operationId);
ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS);
- futureByOperation.put(operationId, new TimerFuture(future));
+ inflightOperations.put(operationId, new InflightOperation(future, connection));
}
public synchronized void failOperation(EndpointResult result, int clusterId) {
@@ -65,8 +64,8 @@ class EndpointResultQueue {
private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
operationProcessor.resultReceived(result, clusterId);
- TimerFuture timerFuture = futureByOperation.remove(result.getOperationId());
- if (timerFuture == null) {
+ InflightOperation operation = inflightOperations.remove(result.getOperationId());
+ if (operation == null) {
if (duplicateGivesWarning) {
log.warning("Result for ID '" + result.getOperationId() + "' received from '" + endpoint +
"', but we have no record of a sent operation. Either something is wrong on the server side " +
@@ -75,13 +74,13 @@ class EndpointResultQueue {
}
return;
}
- timerFuture.getFuture().cancel(false);
+ operation.future.cancel(false);
}
/** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */
private synchronized void timeout(String operationId) {
- TimerFuture timerFuture = futureByOperation.remove(operationId);
- if (timerFuture == null) {
+ InflightOperation operation = inflightOperations.remove(operationId);
+ if (operation == null) {
log.finer("Timeout of operation '" + operationId + "', but operation " +
"not found in map. Result was probably received just-in-time from server, while timeout " +
"task could not be cancelled.");
@@ -93,20 +92,21 @@ class EndpointResultQueue {
}
public synchronized int getPendingSize() {
- return futureByOperation.values().size();
+ return inflightOperations.values().size();
}
public synchronized void failPending(Exception exception) {
- for (Map.Entry<String, TimerFuture> timerFutureEntry : futureByOperation.entrySet()) {
- timerFutureEntry.getValue().getFuture().cancel(false);
- failedOperationId(timerFutureEntry.getKey(), exception);
- }
- futureByOperation.clear();
+ inflightOperations.forEach((operationId, operation) -> {
+ operation.future.cancel(false);
+ EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception);
+ operationProcessor.resultReceived(result, clusterId);
+ });
+ inflightOperations.clear();
}
- private synchronized void failedOperationId(String operationId, Exception exception) {
- EndpointResult endpointResult = EndPointResultFactory.createError(endpoint, operationId, exception);
- operationProcessor.resultReceived(endpointResult, clusterId);
+ public synchronized boolean hasInflightOperations(GatewayConnection connection) {
+ return inflightOperations.entrySet().stream()
+ .anyMatch(entry -> entry.getValue().connection.equals(connection));
}
private class DocumentTimerTask implements Runnable {
@@ -124,18 +124,13 @@ class EndpointResultQueue {
}
- private static class TimerFuture {
-
- private final ScheduledFuture<?> future;
+ private static class InflightOperation {
+ final ScheduledFuture<?> future;
+ final GatewayConnection connection;
- public TimerFuture(ScheduledFuture<?> future) {
+ InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) {
this.future = future;
+ this.connection = connection;
}
-
- private ScheduledFuture<?> getFuture() {
- return future;
- }
-
}
-
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index aa914f852c3..4ceb10d4852 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -6,10 +6,10 @@ import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.Exceptions;
-import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.ServerResponseException;
+import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import java.io.IOException;
import java.io.InputStream;
@@ -58,8 +58,8 @@ public class IOThread implements Runnable, AutoCloseable {
private final Random random = new Random();
private final OldConnectionsDrainer oldConnectionsDrainer;
- private GatewayConnection currentConnection;
- private ConnectionState connectionState = ConnectionState.DISCONNECTED;
+ private volatile GatewayConnection currentConnection;
+ private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
@@ -96,12 +96,12 @@ public class IOThread implements Runnable, AutoCloseable {
this.maxInFlightRequests = maxInFlightRequests;
this.connectionTimeToLive = connectionTimeToLive;
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
- this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
+ this.pollIntervalUS = Math.max(1000, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1ms, 10s]
this.clock = clock;
this.localQueueTimeOut = localQueueTimeOut;
this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint,
clusterId,
- pollIntervalUS,
+ Duration.ofMillis(pollIntervalUS/1000),
connectionTimeToLive,
localQueueTimeOut,
statusReceivedCounter,
@@ -224,7 +224,7 @@ public class IOThread implements Runnable, AutoCloseable {
private void addDocumentsToResultQueue(List<Document> docs) {
for (Document doc : docs) {
- resultQueue.operationSent(doc.getOperationId());
+ resultQueue.operationSent(doc.getOperationId(), currentConnection);
}
}
@@ -517,9 +517,11 @@ public class IOThread implements Runnable, AutoCloseable {
*/
private static class OldConnectionsDrainer implements Runnable {
+ private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName());
+
private final Endpoint endpoint;
private final int clusterId;
- private final long pollIntervalUS;
+ private final Duration pollInterval;
private final Duration connectionTimeToLive;
private final Duration localQueueTimeOut;
private final AtomicInteger statusReceivedCounter;
@@ -535,7 +537,7 @@ public class IOThread implements Runnable, AutoCloseable {
OldConnectionsDrainer(Endpoint endpoint,
int clusterId,
- long pollIntervalUS,
+ Duration pollInterval,
Duration connectionTimeToLive,
Duration localQueueTimeOut,
AtomicInteger statusReceivedCounter,
@@ -544,7 +546,7 @@ public class IOThread implements Runnable, AutoCloseable {
Clock clock) {
this.endpoint = endpoint;
this.clusterId = clusterId;
- this.pollIntervalUS = pollIntervalUS;
+ this.pollInterval = pollInterval;
this.connectionTimeToLive = connectionTimeToLive;
this.localQueueTimeOut = localQueueTimeOut;
this.statusReceivedCounter = statusReceivedCounter;
@@ -561,55 +563,62 @@ public class IOThread implements Runnable, AutoCloseable {
@Override
public void run() {
while (stopSignal.getCount() > 0) {
- checkOldConnections();
try {
- Thread.sleep(pollIntervalUS/1000);
+ checkOldConnections();
+ Thread.sleep(pollInterval.toMillis());
}
catch (InterruptedException e) {
+ log.log(Level.WARNING, "Close thread was interrupted: " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Connection draining failed: " + e.getMessage(), e);
}
}
}
public void checkOldConnections() {
- List<GatewayConnection> toRemove = null;
- try {
- for (GatewayConnection connection : connections) {
- if (closingTime(connection).isBefore(clock.instant())) {
- try {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } finally {
- connection.close();
- }
- } catch (Exception e) {
- // Old connection; best effort
- } finally {
- if (toRemove == null)
- toRemove = new ArrayList<>(1);
- toRemove.add(connection);
- }
- } else if (timeToPoll(connection)) {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } catch (Exception e) {
- // Old connection; best effort
- }
- }
+ for (GatewayConnection connection : connections) {
+ if (!resultQueue.hasInflightOperations(connection)) {
+ log.fine(() -> connection + " no longer has inflight operations");
+ closeConnection(connection);
+ } else if (closingTime(connection).isBefore(clock.instant())) {
+ log.fine(() -> connection + " still has inflight operations, but drain period is over");
+ tryPollAndDrainInflightOperations(connection);
+ closeConnection(connection);
+ } else if (timeToPoll(connection)) {
+ tryPollAndDrainInflightOperations(connection);
}
- } finally {
- if (toRemove != null)
- connections.removeAll(toRemove);
+ }
+ }
+ private void closeConnection(GatewayConnection connection) {
+ log.fine(() -> "Closing " + connection);
+ connection.close();
+ connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration
+ }
+
+ private void tryPollAndDrainInflightOperations(GatewayConnection connection) {
+ try {
+ log.fine(() -> "Polling and draining inflight operations for " + connection);
+ IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
+ } catch (Exception e) {
+ // Old connection; best effort
+ log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage());
}
}
private boolean timeToPoll(GatewayConnection connection) {
- if (connection.lastPollTime() == null) return true;
+ // connectionEndOfLife < connectionLastPolled < now
+ Instant now = clock.instant();
+ Instant endOfLife = connection.connectionTime().plus(connectionTimeToLive);
+ if (connection.lastPollTime() == null) return endOfLife.plus(pollInterval).isBefore(now);
+ if (connection.lastPollTime().plus(pollInterval).isAfter(now)) return false;
// Exponential (2^x) dropoff:
- double connectionEndOfLife = connection.connectionTime().plus(connectionTimeToLive).toEpochMilli();
+ double connectionEndOfLife = endOfLife.toEpochMilli();
double connectionLastPolled = connection.lastPollTime().toEpochMilli();
- return clock.millis() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife);
+ return now.toEpochMilli() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife);
}
private Instant closingTime(GatewayConnection connection) {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
index da82079e992..55961e4aa0e 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
@@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import org.junit.Test;
+import java.time.Clock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ public class EndpointResultQueueTest {
public void testBasics() {
Endpoint endpoint = Endpoint.create("a");
+ GatewayConnection connection = new DryRunGatewayConnection(endpoint, Clock.systemUTC());
OperationProcessor mockAggregator = mock(OperationProcessor.class);
final AtomicInteger resultCount = new AtomicInteger(0);
@@ -39,11 +41,11 @@ public class EndpointResultQueueTest {
EndpointResultQueue q = new EndpointResultQueue(
mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L * 1000L);
- q.operationSent("op1");
+ q.operationSent("op1", connection);
assertThat(q.getPendingSize(), is(1));
- q.operationSent("op2");
+ q.operationSent("op2", connection);
assertThat(q.getPendingSize(), is(2));
- q.operationSent("op3");
+ q.operationSent("op3", connection);
assertThat(q.getPendingSize(), is(3));
q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0);
assertThat(q.getPendingSize(), is(2));
@@ -58,9 +60,9 @@ public class EndpointResultQueueTest {
assertThat(resultCount.get(), is(5));
- q.operationSent("op4");
+ q.operationSent("op4", connection);
assertThat(q.getPendingSize(), is(1));
- q.operationSent("op5");
+ q.operationSent("op5", connection);
assertThat(q.getPendingSize(), is(2));
q.failPending(new RuntimeException());
@@ -72,7 +74,6 @@ public class EndpointResultQueueTest {
@Test
public void testTimeout() throws InterruptedException {
Endpoint endpoint = Endpoint.create("a");
-
OperationProcessor mockAggregator = mock(OperationProcessor.class);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
@@ -81,7 +82,7 @@ public class EndpointResultQueueTest {
}).when(mockAggregator).resultReceived(any(), eq(0));
EndpointResultQueue q = new EndpointResultQueue(
mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L);
- q.operationSent("1234");
+ q.operationSent("1234", new DryRunGatewayConnection(endpoint, Clock.systemUTC()));
assert(latch.await(120, TimeUnit.SECONDS));
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 240adc29197..bddfecdfe65 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -101,42 +101,68 @@ public class IOThreadTest {
tester.send("doc1");
tester.tick(1);
- tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
+ tester.clock().advance(Duration.ofSeconds(31)); // Default connection ttl is 30
tester.tick(3);
assertEquals(1, ioThread.oldConnections().size());
assertEquals(firstConnection, ioThread.oldConnections().get(0));
assertNotSame(firstConnection, ioThread.currentConnection());
- assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
+ assertEquals(31, firstConnection.lastPollTime().toEpochMilli() / 1000);
// Check old connection poll pattern (exponential backoff)
- assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(31, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(45, 1, firstConnection, tester);
tester.clock().advance(Duration.ofSeconds(200));
tester.tick(1);
assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
}
+ @Test
+ public void old_connections_are_closed_early_if_no_inflight_operations() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ tester.tick(3);
+
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ assertEquals(0, ioThread.oldConnections().size());
+
+ firstConnection.hold(true); // do not send result for doc1 in next http response
+ tester.send("doc1");
+ tester.tick(1);
+ tester.clock().advance(Duration.ofSeconds(31)); // Default connection TTL + 1
+ tester.tick(1);
+ assertEquals(1, ioThread.oldConnections().size());
+
+ firstConnection.hold(false); // send result for both doc1 and doc2 in next http response
+ tester.send("doc2");
+ tester.tick(1);
+ assertEquals(1, ioThread.oldConnections().size());
+ tester.clock().advance(Duration.ofSeconds(2));
+ tester.tick(3);
+ assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester);
+ assertEquals(0, ioThread.oldConnections().size());
+ }
+
private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
int advanceSeconds,
DryRunGatewayConnection connection,