aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-09-30 15:24:23 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2020-09-30 15:27:19 +0200
commitc3bfa8316ac359dffce121f1900e5bf63ec11cf0 (patch)
treea57629208114a5dc082b521e353c563beab94f79 /vespa-http-client
parentff205ce5e2eccafeb0957007fb2671f1488e57c3 (diff)
Close connection early if no inflight operations
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java24
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java53
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java65
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java15
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java26
5 files changed, 103 insertions, 80 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 623ea543ffb..9ef3e5eee15 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -45,7 +45,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream write(List<Document> docs) throws IOException {
+ public synchronized InputStream write(List<Document> docs) throws IOException {
if (throwThisOnWrite != null)
throw throwThisOnWrite;
@@ -65,27 +65,27 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream poll() throws IOException {
+ public synchronized InputStream poll() throws IOException {
lastPollTime = clock.instant();
return write(new ArrayList<>());
}
@Override
- public Instant lastPollTime() { return lastPollTime; }
+ public synchronized Instant lastPollTime() { return lastPollTime; }
@Override
- public InputStream drain() throws IOException {
+ public synchronized InputStream drain() throws IOException {
return write(new ArrayList<>());
}
@Override
- public boolean connect() {
+ public synchronized boolean connect() {
connectionTime = clock.instant();
return true;
}
@Override
- public Instant connectionTime() { return connectionTime; }
+ public synchronized Instant connectionTime() { return connectionTime; }
@Override
public Endpoint getEndpoint() {
@@ -93,26 +93,26 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public void handshake() throws ServerResponseException {
+ public synchronized void handshake() throws ServerResponseException {
if (throwThisOnHandshake != null)
throw throwThisOnHandshake;
}
@Override
- public void close() { }
+ public synchronized void close() { }
- public void hold(boolean hold) {
+ public synchronized void hold(boolean hold) {
this.hold = hold;
}
/** Returns the document currently held in this */
- public List<Document> held() { return Collections.unmodifiableList(held); }
+ public synchronized List<Document> held() { return Collections.unmodifiableList(held); }
- public void throwOnWrite(IOException throwThisOnWrite) {
+ public synchronized void throwOnWrite(IOException throwThisOnWrite) {
this.throwThisOnWrite = throwThisOnWrite;
}
- public void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
+ public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
this.throwThisOnHandshake = throwThisOnHandshake;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index a5a37a31665..473dfd4b702 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -3,8 +3,8 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.util.HashMap;
@@ -25,8 +25,7 @@ class EndpointResultQueue {
private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName());
private final OperationProcessor operationProcessor;
- /** The currently in flight operations */
- private final Map<String, TimerFuture> futureByOperation = new HashMap<>();
+ private final Map<String, InflightOperation> inflightOperations = new HashMap<>();
private final Endpoint endpoint;
private final int clusterId;
@@ -45,10 +44,10 @@ class EndpointResultQueue {
this.totalTimeoutMs = totalTimeoutMs;
}
- public synchronized void operationSent(String operationId) {
+ public synchronized void operationSent(String operationId, GatewayConnection connection) {
DocumentTimerTask task = new DocumentTimerTask(operationId);
ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS);
- futureByOperation.put(operationId, new TimerFuture(future));
+ inflightOperations.put(operationId, new InflightOperation(future, connection));
}
public synchronized void failOperation(EndpointResult result, int clusterId) {
@@ -65,8 +64,8 @@ class EndpointResultQueue {
private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
operationProcessor.resultReceived(result, clusterId);
- TimerFuture timerFuture = futureByOperation.remove(result.getOperationId());
- if (timerFuture == null) {
+ InflightOperation operation = inflightOperations.remove(result.getOperationId());
+ if (operation == null) {
if (duplicateGivesWarning) {
log.warning("Result for ID '" + result.getOperationId() + "' received from '" + endpoint +
"', but we have no record of a sent operation. Either something is wrong on the server side " +
@@ -75,13 +74,13 @@ class EndpointResultQueue {
}
return;
}
- timerFuture.getFuture().cancel(false);
+ operation.future.cancel(false);
}
/** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */
private synchronized void timeout(String operationId) {
- TimerFuture timerFuture = futureByOperation.remove(operationId);
- if (timerFuture == null) {
+ InflightOperation operation = inflightOperations.remove(operationId);
+ if (operation == null) {
log.finer("Timeout of operation '" + operationId + "', but operation " +
"not found in map. Result was probably received just-in-time from server, while timeout " +
"task could not be cancelled.");
@@ -93,20 +92,21 @@ class EndpointResultQueue {
}
public synchronized int getPendingSize() {
- return futureByOperation.values().size();
+ return inflightOperations.values().size();
}
public synchronized void failPending(Exception exception) {
- for (Map.Entry<String, TimerFuture> timerFutureEntry : futureByOperation.entrySet()) {
- timerFutureEntry.getValue().getFuture().cancel(false);
- failedOperationId(timerFutureEntry.getKey(), exception);
- }
- futureByOperation.clear();
+ inflightOperations.forEach((operationId, operation) -> {
+ operation.future.cancel(false);
+ EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception);
+ operationProcessor.resultReceived(result, clusterId);
+ });
+ inflightOperations.clear();
}
- private synchronized void failedOperationId(String operationId, Exception exception) {
- EndpointResult endpointResult = EndPointResultFactory.createError(endpoint, operationId, exception);
- operationProcessor.resultReceived(endpointResult, clusterId);
+ public synchronized boolean hasInflightOperations(GatewayConnection connection) {
+ return inflightOperations.entrySet().stream()
+ .anyMatch(entry -> entry.getValue().connection.equals(connection));
}
private class DocumentTimerTask implements Runnable {
@@ -124,18 +124,13 @@ class EndpointResultQueue {
}
- private static class TimerFuture {
-
- private final ScheduledFuture<?> future;
+ private static class InflightOperation {
+ final ScheduledFuture<?> future;
+ final GatewayConnection connection;
- public TimerFuture(ScheduledFuture<?> future) {
+ InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) {
this.future = future;
+ this.connection = connection;
}
-
- private ScheduledFuture<?> getFuture() {
- return future;
- }
-
}
-
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index aa914f852c3..3feebc2029a 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -6,10 +6,10 @@ import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.Exceptions;
-import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.ServerResponseException;
+import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import java.io.IOException;
import java.io.InputStream;
@@ -58,8 +58,8 @@ public class IOThread implements Runnable, AutoCloseable {
private final Random random = new Random();
private final OldConnectionsDrainer oldConnectionsDrainer;
- private GatewayConnection currentConnection;
- private ConnectionState connectionState = ConnectionState.DISCONNECTED;
+ private volatile GatewayConnection currentConnection;
+ private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
@@ -224,7 +224,7 @@ public class IOThread implements Runnable, AutoCloseable {
private void addDocumentsToResultQueue(List<Document> docs) {
for (Document doc : docs) {
- resultQueue.operationSent(doc.getOperationId());
+ resultQueue.operationSent(doc.getOperationId(), currentConnection);
}
}
@@ -517,6 +517,8 @@ public class IOThread implements Runnable, AutoCloseable {
*/
private static class OldConnectionsDrainer implements Runnable {
+ private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName());
+
private final Endpoint endpoint;
private final int clusterId;
private final long pollIntervalUS;
@@ -571,35 +573,33 @@ public class IOThread implements Runnable, AutoCloseable {
}
public void checkOldConnections() {
- List<GatewayConnection> toRemove = null;
- try {
- for (GatewayConnection connection : connections) {
- if (closingTime(connection).isBefore(clock.instant())) {
- try {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } finally {
- connection.close();
- }
- } catch (Exception e) {
- // Old connection; best effort
- } finally {
- if (toRemove == null)
- toRemove = new ArrayList<>(1);
- toRemove.add(connection);
- }
- } else if (timeToPoll(connection)) {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } catch (Exception e) {
- // Old connection; best effort
- }
- }
+ for (GatewayConnection connection : connections) {
+ if (!resultQueue.hasInflightOperations(connection)) {
+ log.fine(() -> connection + " no longer has inflight operations");
+ closeConnection(connection);
+ } else if (closingTime(connection).isBefore(clock.instant())) {
+ log.fine(() -> connection + " still has inflight operations, but drain period is over");
+ tryPollAndDrainInflightOperations(connection);
+ closeConnection(connection);
+ } else if (timeToPoll(connection)) {
+ tryPollAndDrainInflightOperations(connection);
}
- } finally {
- if (toRemove != null)
- connections.removeAll(toRemove);
+ }
+ }
+ private void closeConnection(GatewayConnection connection) {
+ log.fine(() -> "Closing " + connection);
+ connection.close();
+ connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration
+ }
+
+ private void tryPollAndDrainInflightOperations(GatewayConnection connection) {
+ try {
+ log.fine(() -> "Polling and draining inflight operations for " + connection);
+ IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
+ } catch (Exception e) {
+ // Old connection; best effort
+ log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage());
}
}
@@ -609,6 +609,7 @@ public class IOThread implements Runnable, AutoCloseable {
// Exponential (2^x) dropoff:
double connectionEndOfLife = connection.connectionTime().plus(connectionTimeToLive).toEpochMilli();
double connectionLastPolled = connection.lastPollTime().toEpochMilli();
+ // connectionEndOfLife < connectionLastPolled < clock.millis()
return clock.millis() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife);
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
index da82079e992..55961e4aa0e 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
@@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import org.junit.Test;
+import java.time.Clock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ public class EndpointResultQueueTest {
public void testBasics() {
Endpoint endpoint = Endpoint.create("a");
+ GatewayConnection connection = new DryRunGatewayConnection(endpoint, Clock.systemUTC());
OperationProcessor mockAggregator = mock(OperationProcessor.class);
final AtomicInteger resultCount = new AtomicInteger(0);
@@ -39,11 +41,11 @@ public class EndpointResultQueueTest {
EndpointResultQueue q = new EndpointResultQueue(
mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L * 1000L);
- q.operationSent("op1");
+ q.operationSent("op1", connection);
assertThat(q.getPendingSize(), is(1));
- q.operationSent("op2");
+ q.operationSent("op2", connection);
assertThat(q.getPendingSize(), is(2));
- q.operationSent("op3");
+ q.operationSent("op3", connection);
assertThat(q.getPendingSize(), is(3));
q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0);
assertThat(q.getPendingSize(), is(2));
@@ -58,9 +60,9 @@ public class EndpointResultQueueTest {
assertThat(resultCount.get(), is(5));
- q.operationSent("op4");
+ q.operationSent("op4", connection);
assertThat(q.getPendingSize(), is(1));
- q.operationSent("op5");
+ q.operationSent("op5", connection);
assertThat(q.getPendingSize(), is(2));
q.failPending(new RuntimeException());
@@ -72,7 +74,6 @@ public class EndpointResultQueueTest {
@Test
public void testTimeout() throws InterruptedException {
Endpoint endpoint = Endpoint.create("a");
-
OperationProcessor mockAggregator = mock(OperationProcessor.class);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
@@ -81,7 +82,7 @@ public class EndpointResultQueueTest {
}).when(mockAggregator).resultReceived(any(), eq(0));
EndpointResultQueue q = new EndpointResultQueue(
mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L);
- q.operationSent("1234");
+ q.operationSent("1234", new DryRunGatewayConnection(endpoint, Clock.systemUTC()));
assert(latch.await(120, TimeUnit.SECONDS));
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index ef90d6853b1..bddfecdfe65 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -137,6 +137,32 @@ public class IOThreadTest {
assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
}
+ @Test
+ public void old_connections_are_closed_early_if_no_inflight_operations() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ tester.tick(3);
+
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ assertEquals(0, ioThread.oldConnections().size());
+
+ firstConnection.hold(true); // do not send result for doc1 in next http response
+ tester.send("doc1");
+ tester.tick(1);
+ tester.clock().advance(Duration.ofSeconds(31)); // Default connection TTL + 1
+ tester.tick(1);
+ assertEquals(1, ioThread.oldConnections().size());
+
+ firstConnection.hold(false); // send result for both doc1 and doc2 in next http response
+ tester.send("doc2");
+ tester.tick(1);
+ assertEquals(1, ioThread.oldConnections().size());
+ tester.clock().advance(Duration.ofSeconds(2));
+ tester.tick(3);
+ assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester);
+ assertEquals(0, ioThread.oldConnections().size());
+ }
+
private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
int advanceSeconds,
DryRunGatewayConnection connection,