diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-30 15:24:23 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-30 15:27:19 +0200 |
commit | c3bfa8316ac359dffce121f1900e5bf63ec11cf0 (patch) | |
tree | a57629208114a5dc082b521e353c563beab94f79 /vespa-http-client/src/main | |
parent | ff205ce5e2eccafeb0957007fb2671f1488e57c3 (diff) |
Close connection early if no inflight operations
Diffstat (limited to 'vespa-http-client/src/main')
3 files changed, 69 insertions, 73 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 623ea543ffb..9ef3e5eee15 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -45,7 +45,7 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream write(List<Document> docs) throws IOException { + public synchronized InputStream write(List<Document> docs) throws IOException { if (throwThisOnWrite != null) throw throwThisOnWrite; @@ -65,27 +65,27 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream poll() throws IOException { + public synchronized InputStream poll() throws IOException { lastPollTime = clock.instant(); return write(new ArrayList<>()); } @Override - public Instant lastPollTime() { return lastPollTime; } + public synchronized Instant lastPollTime() { return lastPollTime; } @Override - public InputStream drain() throws IOException { + public synchronized InputStream drain() throws IOException { return write(new ArrayList<>()); } @Override - public boolean connect() { + public synchronized boolean connect() { connectionTime = clock.instant(); return true; } @Override - public Instant connectionTime() { return connectionTime; } + public synchronized Instant connectionTime() { return connectionTime; } @Override public Endpoint getEndpoint() { @@ -93,26 +93,26 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public void handshake() throws ServerResponseException { + public synchronized void handshake() throws ServerResponseException { if (throwThisOnHandshake != null) throw throwThisOnHandshake; } @Override - public void close() { } + public synchronized void close() { } - public void hold(boolean hold) { + public synchronized void hold(boolean hold) { this.hold = hold; } /** Returns the document currently held in this */ - public List<Document> held() { return Collections.unmodifiableList(held); } + public synchronized List<Document> held() { return Collections.unmodifiableList(held); } - public void throwOnWrite(IOException throwThisOnWrite) { + public synchronized void throwOnWrite(IOException throwThisOnWrite) { this.throwThisOnWrite = throwThisOnWrite; } - public void throwOnHandshake(ServerResponseException throwThisOnHandshake) { + public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) { this.throwThisOnHandshake = throwThisOnHandshake; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index a5a37a31665..473dfd4b702 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -3,8 +3,8 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.FeedEndpointException; import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.util.HashMap; @@ -25,8 +25,7 @@ class EndpointResultQueue { private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName()); private final OperationProcessor operationProcessor; - /** The currently in flight operations */ - private final Map<String, TimerFuture> futureByOperation = new HashMap<>(); + private final Map<String, InflightOperation> inflightOperations = new HashMap<>(); private final Endpoint endpoint; private final int clusterId; @@ -45,10 +44,10 @@ class EndpointResultQueue { this.totalTimeoutMs = totalTimeoutMs; } - public synchronized void operationSent(String operationId) { + public synchronized void operationSent(String operationId, GatewayConnection connection) { DocumentTimerTask task = new DocumentTimerTask(operationId); ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS); - futureByOperation.put(operationId, new TimerFuture(future)); + inflightOperations.put(operationId, new InflightOperation(future, connection)); } public synchronized void failOperation(EndpointResult result, int clusterId) { @@ -65,8 +64,8 @@ class EndpointResultQueue { private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { operationProcessor.resultReceived(result, clusterId); - TimerFuture timerFuture = futureByOperation.remove(result.getOperationId()); - if (timerFuture == null) { + InflightOperation operation = inflightOperations.remove(result.getOperationId()); + if (operation == null) { if (duplicateGivesWarning) { log.warning("Result for ID '" + result.getOperationId() + "' received from '" + endpoint + "', but we have no record of a sent operation. Either something is wrong on the server side " + @@ -75,13 +74,13 @@ class EndpointResultQueue { } return; } - timerFuture.getFuture().cancel(false); + operation.future.cancel(false); } /** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */ private synchronized void timeout(String operationId) { - TimerFuture timerFuture = futureByOperation.remove(operationId); - if (timerFuture == null) { + InflightOperation operation = inflightOperations.remove(operationId); + if (operation == null) { log.finer("Timeout of operation '" + operationId + "', but operation " + "not found in map. Result was probably received just-in-time from server, while timeout " + "task could not be cancelled."); @@ -93,20 +92,21 @@ class EndpointResultQueue { } public synchronized int getPendingSize() { - return futureByOperation.values().size(); + return inflightOperations.values().size(); } public synchronized void failPending(Exception exception) { - for (Map.Entry<String, TimerFuture> timerFutureEntry : futureByOperation.entrySet()) { - timerFutureEntry.getValue().getFuture().cancel(false); - failedOperationId(timerFutureEntry.getKey(), exception); - } - futureByOperation.clear(); + inflightOperations.forEach((operationId, operation) -> { + operation.future.cancel(false); + EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception); + operationProcessor.resultReceived(result, clusterId); + }); + inflightOperations.clear(); } - private synchronized void failedOperationId(String operationId, Exception exception) { - EndpointResult endpointResult = EndPointResultFactory.createError(endpoint, operationId, exception); - operationProcessor.resultReceived(endpointResult, clusterId); + public synchronized boolean hasInflightOperations(GatewayConnection connection) { + return inflightOperations.entrySet().stream() + .anyMatch(entry -> entry.getValue().connection.equals(connection)); } private class DocumentTimerTask implements Runnable { @@ -124,18 +124,13 @@ class EndpointResultQueue { } - private static class TimerFuture { - - private final ScheduledFuture<?> future; + private static class InflightOperation { + final ScheduledFuture<?> future; + final GatewayConnection connection; - public TimerFuture(ScheduledFuture<?> future) { + InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) { this.future = future; + this.connection = connection; } - - private ScheduledFuture<?> getFuture() { - return future; - } - } - } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index aa914f852c3..3feebc2029a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -6,10 +6,10 @@ import com.yahoo.vespa.http.client.FeedProtocolException; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.Exceptions; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.Exceptions; import com.yahoo.vespa.http.client.core.ServerResponseException; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import java.io.IOException; import java.io.InputStream; @@ -58,8 +58,8 @@ public class IOThread implements Runnable, AutoCloseable { private final Random random = new Random(); private final OldConnectionsDrainer oldConnectionsDrainer; - private GatewayConnection currentConnection; - private ConnectionState connectionState = ConnectionState.DISCONNECTED; + private volatile GatewayConnection currentConnection; + private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); @@ -224,7 +224,7 @@ public class IOThread implements Runnable, AutoCloseable { private void addDocumentsToResultQueue(List<Document> docs) { for (Document doc : docs) { - resultQueue.operationSent(doc.getOperationId()); + resultQueue.operationSent(doc.getOperationId(), currentConnection); } } @@ -517,6 +517,8 @@ public class IOThread implements Runnable, AutoCloseable { */ private static class OldConnectionsDrainer implements Runnable { + private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName()); + private final Endpoint endpoint; private final int clusterId; private final long pollIntervalUS; @@ -571,35 +573,33 @@ public class IOThread implements Runnable, AutoCloseable { } public void checkOldConnections() { - List<GatewayConnection> toRemove = null; - try { - for (GatewayConnection connection : connections) { - if (closingTime(connection).isBefore(clock.instant())) { - try { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } finally { - connection.close(); - } - } catch (Exception e) { - // Old connection; best effort - } finally { - if (toRemove == null) - toRemove = new ArrayList<>(1); - toRemove.add(connection); - } - } else if (timeToPoll(connection)) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Exception e) { - // Old connection; best effort - } - } + for (GatewayConnection connection : connections) { + if (!resultQueue.hasInflightOperations(connection)) { + log.fine(() -> connection + " no longer has inflight operations"); + closeConnection(connection); + } else if (closingTime(connection).isBefore(clock.instant())) { + log.fine(() -> connection + " still has inflight operations, but drain period is over"); + tryPollAndDrainInflightOperations(connection); + closeConnection(connection); + } else if (timeToPoll(connection)) { + tryPollAndDrainInflightOperations(connection); } - } finally { - if (toRemove != null) - connections.removeAll(toRemove); + } + } + private void closeConnection(GatewayConnection connection) { + log.fine(() -> "Closing " + connection); + connection.close(); + connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration + } + + private void tryPollAndDrainInflightOperations(GatewayConnection connection) { + try { + log.fine(() -> "Polling and draining inflight operations for " + connection); + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } catch (Exception e) { + // Old connection; best effort + log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage()); } } @@ -609,6 +609,7 @@ public class IOThread implements Runnable, AutoCloseable { // Exponential (2^x) dropoff: double connectionEndOfLife = connection.connectionTime().plus(connectionTimeToLive).toEpochMilli(); double connectionLastPolled = connection.lastPollTime().toEpochMilli(); + // connectionEndOfLife < connectionLastPolled < clock.millis() return clock.millis() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife); } |