diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-30 15:24:23 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-30 15:27:19 +0200 |
commit | c3bfa8316ac359dffce121f1900e5bf63ec11cf0 (patch) | |
tree | a57629208114a5dc082b521e353c563beab94f79 /vespa-http-client | |
parent | ff205ce5e2eccafeb0957007fb2671f1488e57c3 (diff) |
Close connection early if no inflight operations
Diffstat (limited to 'vespa-http-client')
5 files changed, 103 insertions, 80 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 623ea543ffb..9ef3e5eee15 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -45,7 +45,7 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream write(List<Document> docs) throws IOException { + public synchronized InputStream write(List<Document> docs) throws IOException { if (throwThisOnWrite != null) throw throwThisOnWrite; @@ -65,27 +65,27 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream poll() throws IOException { + public synchronized InputStream poll() throws IOException { lastPollTime = clock.instant(); return write(new ArrayList<>()); } @Override - public Instant lastPollTime() { return lastPollTime; } + public synchronized Instant lastPollTime() { return lastPollTime; } @Override - public InputStream drain() throws IOException { + public synchronized InputStream drain() throws IOException { return write(new ArrayList<>()); } @Override - public boolean connect() { + public synchronized boolean connect() { connectionTime = clock.instant(); return true; } @Override - public Instant connectionTime() { return connectionTime; } + public synchronized Instant connectionTime() { return connectionTime; } @Override public Endpoint getEndpoint() { @@ -93,26 +93,26 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public void handshake() throws ServerResponseException { + public synchronized void handshake() throws ServerResponseException { if (throwThisOnHandshake != null) throw throwThisOnHandshake; } @Override - public void close() { } + public synchronized void close() { } - public void hold(boolean hold) { + public synchronized void hold(boolean hold) { this.hold = hold; } /** Returns the document currently held in this */ - public List<Document> held() { return Collections.unmodifiableList(held); } + public synchronized List<Document> held() { return Collections.unmodifiableList(held); } - public void throwOnWrite(IOException throwThisOnWrite) { + public synchronized void throwOnWrite(IOException throwThisOnWrite) { this.throwThisOnWrite = throwThisOnWrite; } - public void throwOnHandshake(ServerResponseException throwThisOnHandshake) { + public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) { this.throwThisOnHandshake = throwThisOnHandshake; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index a5a37a31665..473dfd4b702 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -3,8 +3,8 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.FeedEndpointException; import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.util.HashMap; @@ -25,8 +25,7 @@ class EndpointResultQueue { private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName()); private final OperationProcessor operationProcessor; - /** The currently in flight operations */ - private final Map<String, TimerFuture> futureByOperation = new HashMap<>(); + private final Map<String, InflightOperation> inflightOperations = new HashMap<>(); private final Endpoint endpoint; private final int clusterId; @@ -45,10 +44,10 @@ class EndpointResultQueue { this.totalTimeoutMs = totalTimeoutMs; } - public synchronized void operationSent(String operationId) { + public synchronized void operationSent(String operationId, GatewayConnection connection) { DocumentTimerTask task = new DocumentTimerTask(operationId); ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS); - futureByOperation.put(operationId, new TimerFuture(future)); + inflightOperations.put(operationId, new InflightOperation(future, connection)); } public synchronized void failOperation(EndpointResult result, int clusterId) { @@ -65,8 +64,8 @@ class EndpointResultQueue { private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { operationProcessor.resultReceived(result, clusterId); - TimerFuture timerFuture = futureByOperation.remove(result.getOperationId()); - if (timerFuture == null) { + InflightOperation operation = inflightOperations.remove(result.getOperationId()); + if (operation == null) { if (duplicateGivesWarning) { log.warning("Result for ID '" + result.getOperationId() + "' received from '" + endpoint + "', but we have no record of a sent operation. Either something is wrong on the server side " + @@ -75,13 +74,13 @@ class EndpointResultQueue { } return; } - timerFuture.getFuture().cancel(false); + operation.future.cancel(false); } /** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */ private synchronized void timeout(String operationId) { - TimerFuture timerFuture = futureByOperation.remove(operationId); - if (timerFuture == null) { + InflightOperation operation = inflightOperations.remove(operationId); + if (operation == null) { log.finer("Timeout of operation '" + operationId + "', but operation " + "not found in map. Result was probably received just-in-time from server, while timeout " + "task could not be cancelled."); @@ -93,20 +92,21 @@ class EndpointResultQueue { } public synchronized int getPendingSize() { - return futureByOperation.values().size(); + return inflightOperations.values().size(); } public synchronized void failPending(Exception exception) { - for (Map.Entry<String, TimerFuture> timerFutureEntry : futureByOperation.entrySet()) { - timerFutureEntry.getValue().getFuture().cancel(false); - failedOperationId(timerFutureEntry.getKey(), exception); - } - futureByOperation.clear(); + inflightOperations.forEach((operationId, operation) -> { + operation.future.cancel(false); + EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception); + operationProcessor.resultReceived(result, clusterId); + }); + inflightOperations.clear(); } - private synchronized void failedOperationId(String operationId, Exception exception) { - EndpointResult endpointResult = EndPointResultFactory.createError(endpoint, operationId, exception); - operationProcessor.resultReceived(endpointResult, clusterId); + public synchronized boolean hasInflightOperations(GatewayConnection connection) { + return inflightOperations.entrySet().stream() + .anyMatch(entry -> entry.getValue().connection.equals(connection)); } private class DocumentTimerTask implements Runnable { @@ -124,18 +124,13 @@ class EndpointResultQueue { } - private static class TimerFuture { - - private final ScheduledFuture<?> future; + private static class InflightOperation { + final ScheduledFuture<?> future; + final GatewayConnection connection; - public TimerFuture(ScheduledFuture<?> future) { + InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) { this.future = future; + this.connection = connection; } - - private ScheduledFuture<?> getFuture() { - return future; - } - } - } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index aa914f852c3..3feebc2029a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -6,10 +6,10 @@ import com.yahoo.vespa.http.client.FeedProtocolException; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.Exceptions; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.Exceptions; import com.yahoo.vespa.http.client.core.ServerResponseException; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import java.io.IOException; import java.io.InputStream; @@ -58,8 +58,8 @@ public class IOThread implements Runnable, AutoCloseable { private final Random random = new Random(); private final OldConnectionsDrainer oldConnectionsDrainer; - private GatewayConnection currentConnection; - private ConnectionState connectionState = ConnectionState.DISCONNECTED; + private volatile GatewayConnection currentConnection; + private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); @@ -224,7 +224,7 @@ public class IOThread implements Runnable, AutoCloseable { private void addDocumentsToResultQueue(List<Document> docs) { for (Document doc : docs) { - resultQueue.operationSent(doc.getOperationId()); + resultQueue.operationSent(doc.getOperationId(), currentConnection); } } @@ -517,6 +517,8 @@ public class IOThread implements Runnable, AutoCloseable { */ private static class OldConnectionsDrainer implements Runnable { + private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName()); + private final Endpoint endpoint; private final int clusterId; private final long pollIntervalUS; @@ -571,35 +573,33 @@ public class IOThread implements Runnable, AutoCloseable { } public void checkOldConnections() { - List<GatewayConnection> toRemove = null; - try { - for (GatewayConnection connection : connections) { - if (closingTime(connection).isBefore(clock.instant())) { - try { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } finally { - connection.close(); - } - } catch (Exception e) { - // Old connection; best effort - } finally { - if (toRemove == null) - toRemove = new ArrayList<>(1); - toRemove.add(connection); - } - } else if (timeToPoll(connection)) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Exception e) { - // Old connection; best effort - } - } + for (GatewayConnection connection : connections) { + if (!resultQueue.hasInflightOperations(connection)) { + log.fine(() -> connection + " no longer has inflight operations"); + closeConnection(connection); + } else if (closingTime(connection).isBefore(clock.instant())) { + log.fine(() -> connection + " still has inflight operations, but drain period is over"); + tryPollAndDrainInflightOperations(connection); + closeConnection(connection); + } else if (timeToPoll(connection)) { + tryPollAndDrainInflightOperations(connection); } - } finally { - if (toRemove != null) - connections.removeAll(toRemove); + } + } + private void closeConnection(GatewayConnection connection) { + log.fine(() -> "Closing " + connection); + connection.close(); + connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration + } + + private void tryPollAndDrainInflightOperations(GatewayConnection connection) { + try { + log.fine(() -> "Polling and draining inflight operations for " + connection); + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } catch (Exception e) { + // Old connection; best effort + log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage()); } } @@ -609,6 +609,7 @@ public class IOThread implements Runnable, AutoCloseable { // Exponential (2^x) dropoff: double connectionEndOfLife = connection.connectionTime().plus(connectionTimeToLive).toEpochMilli(); double connectionLastPolled = connection.lastPollTime().toEpochMilli(); + // connectionEndOfLife < connectionLastPolled < clock.millis() return clock.millis() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java index da82079e992..55961e4aa0e 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java @@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import org.junit.Test; +import java.time.Clock; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ public class EndpointResultQueueTest { public void testBasics() { Endpoint endpoint = Endpoint.create("a"); + GatewayConnection connection = new DryRunGatewayConnection(endpoint, Clock.systemUTC()); OperationProcessor mockAggregator = mock(OperationProcessor.class); final AtomicInteger resultCount = new AtomicInteger(0); @@ -39,11 +41,11 @@ public class EndpointResultQueueTest { EndpointResultQueue q = new EndpointResultQueue( mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L * 1000L); - q.operationSent("op1"); + q.operationSent("op1", connection); assertThat(q.getPendingSize(), is(1)); - q.operationSent("op2"); + q.operationSent("op2", connection); assertThat(q.getPendingSize(), is(2)); - q.operationSent("op3"); + q.operationSent("op3", connection); assertThat(q.getPendingSize(), is(3)); q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0); assertThat(q.getPendingSize(), is(2)); @@ -58,9 +60,9 @@ public class EndpointResultQueueTest { assertThat(resultCount.get(), is(5)); - q.operationSent("op4"); + q.operationSent("op4", connection); assertThat(q.getPendingSize(), is(1)); - q.operationSent("op5"); + q.operationSent("op5", connection); assertThat(q.getPendingSize(), is(2)); q.failPending(new RuntimeException()); @@ -72,7 +74,6 @@ public class EndpointResultQueueTest { @Test public void testTimeout() throws InterruptedException { Endpoint endpoint = Endpoint.create("a"); - OperationProcessor mockAggregator = mock(OperationProcessor.class); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { @@ -81,7 +82,7 @@ public class EndpointResultQueueTest { }).when(mockAggregator).resultReceived(any(), eq(0)); EndpointResultQueue q = new EndpointResultQueue( mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L); - q.operationSent("1234"); + q.operationSent("1234", new DryRunGatewayConnection(endpoint, Clock.systemUTC())); assert(latch.await(120, TimeUnit.SECONDS)); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index ef90d6853b1..bddfecdfe65 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -137,6 +137,32 @@ public class IOThreadTest { assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); } + @Test + public void old_connections_are_closed_early_if_no_inflight_operations() { + OperationProcessorTester tester = new OperationProcessorTester(); + tester.tick(3); + + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + assertEquals(0, ioThread.oldConnections().size()); + + firstConnection.hold(true); // do not send result for doc1 in next http response + tester.send("doc1"); + tester.tick(1); + tester.clock().advance(Duration.ofSeconds(31)); // Default connection TTL + 1 + tester.tick(1); + assertEquals(1, ioThread.oldConnections().size()); + + firstConnection.hold(false); // send result for both doc1 and doc2 in next http response + tester.send("doc2"); + tester.tick(1); + assertEquals(1, ioThread.oldConnections().size()); + tester.clock().advance(Duration.ofSeconds(2)); + tester.tick(3); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertEquals(0, ioThread.oldConnections().size()); + } + private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, int advanceSeconds, DryRunGatewayConnection connection, |