diff options
Diffstat (limited to 'vespa-http-client/src/main/java/com')
2 files changed, 20 insertions, 11 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index eaf869bdcdc..e4a3f91c33e 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -12,6 +12,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -60,14 +61,13 @@ public class FeedClientImpl implements FeedClient { @Override public void close() { Instant lastResultReceived = Instant.now(); - long lastNumberOfResults = operationProcessor.getIncompleteResultQueueSize(); + Optional<String> oldestIncompleteId = operationProcessor.oldestIncompleteResultId(); - while (waitForOperations(lastResultReceived, lastNumberOfResults, sleepTimeMs, closeTimeoutMs)) { - long results = operationProcessor.getIncompleteResultQueueSize(); - if (results != lastNumberOfResults) { + while (oldestIncompleteId.isPresent() && waitForOperations(lastResultReceived, sleepTimeMs, closeTimeoutMs)) { + Optional<String> oldestIncompleteIdNow = operationProcessor.oldestIncompleteResultId(); + if ( ! oldestIncompleteId.equals(oldestIncompleteIdNow)) lastResultReceived = Instant.now(); - } - lastNumberOfResults = results; + oldestIncompleteId = oldestIncompleteIdNow; } operationProcessor.close(); } @@ -78,10 +78,7 @@ public class FeedClientImpl implements FeedClient { } // On return value true, wait more. Public for testing. - public static boolean waitForOperations(Instant lastResultReceived, long lastNumberOfResults, long sleepTimeMs, long closeTimeoutMs) { - if (lastNumberOfResults == 0) { - return false; - } + public static boolean waitForOperations(Instant lastResultReceived, long sleepTimeMs, long closeTimeoutMs) { if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) { return false; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index cff6ad2ed48..24f27196394 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -18,8 +18,11 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -35,7 +38,7 @@ import java.util.logging.Logger; public class OperationProcessor { private static final Logger log = Logger.getLogger(OperationProcessor.class.getName()); - private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new HashMap<>(); + private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>(); private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create(); private final Set<String> inflightDocumentIds = new HashSet<>(); private final int numDestinations; @@ -103,6 +106,15 @@ public class OperationProcessor { } } + /** Returns the id of the oldest operation to be sent. */ + public Optional<String> oldestIncompleteResultId() { + synchronized (monitor) { + return Optional.of(docSendInfoByOperationId.keySet().iterator()) + .filter(Iterator::hasNext) + .map(Iterator::next); + } + } + public String getClientId() { return clientId; } |