diff options
author | Jon Bratseth <bratseth@oath.com> | 2018-10-16 21:50:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-16 21:50:20 +0200 |
commit | 2f6d6c559c310280a74f134970614ae4ee95a22f (patch) | |
tree | 4961cbc47fbc893378096d7b7628eb5cf3165628 /vespa-http-client/src/main | |
parent | 16c10f396298967a5d144518227f2e36bfa80eb2 (diff) | |
parent | 1685b5eff9084ca7da4637432f6c661b7542ddaa (diff) |
Merge pull request #7328 from vespa-engine/jvenstad/fix-bug-in-feed-client-close-logic
Look at oldest send operation, rather than number of running ones
Diffstat (limited to 'vespa-http-client/src/main')
2 files changed, 22 insertions, 13 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index 5376a96ae4b..16ef8a4bbd2 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -12,6 +12,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -59,15 +60,14 @@ public class FeedClientImpl implements FeedClient { @Override public void close() { - Instant lastResultReceived = Instant.now(); - long lastNumberOfResults = operationProcessor.getIncompleteResultQueueSize(); + Instant lastOldestResultReceivedAt = Instant.now(); + Optional<String> oldestIncompleteId = operationProcessor.oldestIncompleteResultId(); - while (waitForOperations(lastResultReceived, lastNumberOfResults, sleepTimeMs, closeTimeoutMs)) { - long results = operationProcessor.getIncompleteResultQueueSize(); - if (results != lastNumberOfResults) { - lastResultReceived = Instant.now(); - } - lastNumberOfResults = results; + while (oldestIncompleteId.isPresent() && waitForOperations(lastOldestResultReceivedAt, sleepTimeMs, closeTimeoutMs)) { + Optional<String> oldestIncompleteIdNow = operationProcessor.oldestIncompleteResultId(); + if ( ! oldestIncompleteId.equals(oldestIncompleteIdNow)) + lastOldestResultReceivedAt = Instant.now(); + oldestIncompleteId = oldestIncompleteIdNow; } operationProcessor.close(); } @@ -78,10 +78,7 @@ public class FeedClientImpl implements FeedClient { } // On return value true, wait more. Public for testing. - public static boolean waitForOperations(Instant lastResultReceived, long lastNumberOfResults, long sleepTimeMs, long closeTimeoutMs) { - if (lastNumberOfResults == 0) { - return false; - } + public static boolean waitForOperations(Instant lastResultReceived, long sleepTimeMs, long closeTimeoutMs) { if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) { return false; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index cff6ad2ed48..d300bead9c1 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -18,8 +18,11 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -35,7 +38,7 @@ import java.util.logging.Logger; public class OperationProcessor { private static final Logger log = Logger.getLogger(OperationProcessor.class.getName()); - private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new HashMap<>(); + private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>(); private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create(); private final Set<String> inflightDocumentIds = new HashSet<>(); private final int numDestinations; @@ -103,6 +106,15 @@ public class OperationProcessor { } } + /** Returns the id of the oldest operation to be sent. */ + public Optional<String> oldestIncompleteResultId() { + synchronized (monitor) { + return docSendInfoByOperationId.isEmpty() + ? Optional.empty() + : Optional.of(docSendInfoByOperationId.keySet().iterator().next()); + } + } + public String getClientId() { return clientId; } |