diff options
author | Jon Bratseth <bratseth@oath.com> | 2018-10-16 21:50:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-16 21:50:20 +0200 |
commit | 2f6d6c559c310280a74f134970614ae4ee95a22f (patch) | |
tree | 4961cbc47fbc893378096d7b7628eb5cf3165628 /vespa-http-client | |
parent | 16c10f396298967a5d144518227f2e36bfa80eb2 (diff) | |
parent | 1685b5eff9084ca7da4637432f6c661b7542ddaa (diff) |
Merge pull request #7328 from vespa-engine/jvenstad/fix-bug-in-feed-client-close-logic
Look at oldest send operation, rather than number of running ones
Diffstat (limited to 'vespa-http-client')
4 files changed, 35 insertions, 19 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index 5376a96ae4b..16ef8a4bbd2 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -12,6 +12,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -59,15 +60,14 @@ public class FeedClientImpl implements FeedClient { @Override public void close() { - Instant lastResultReceived = Instant.now(); - long lastNumberOfResults = operationProcessor.getIncompleteResultQueueSize(); + Instant lastOldestResultReceivedAt = Instant.now(); + Optional<String> oldestIncompleteId = operationProcessor.oldestIncompleteResultId(); - while (waitForOperations(lastResultReceived, lastNumberOfResults, sleepTimeMs, closeTimeoutMs)) { - long results = operationProcessor.getIncompleteResultQueueSize(); - if (results != lastNumberOfResults) { - lastResultReceived = Instant.now(); - } - lastNumberOfResults = results; + while (oldestIncompleteId.isPresent() && waitForOperations(lastOldestResultReceivedAt, sleepTimeMs, closeTimeoutMs)) { + Optional<String> oldestIncompleteIdNow = operationProcessor.oldestIncompleteResultId(); + if ( ! oldestIncompleteId.equals(oldestIncompleteIdNow)) + lastOldestResultReceivedAt = Instant.now(); + oldestIncompleteId = oldestIncompleteIdNow; } operationProcessor.close(); } @@ -78,10 +78,7 @@ public class FeedClientImpl implements FeedClient { } // On return value true, wait more. Public for testing. - public static boolean waitForOperations(Instant lastResultReceived, long lastNumberOfResults, long sleepTimeMs, long closeTimeoutMs) { - if (lastNumberOfResults == 0) { - return false; - } + public static boolean waitForOperations(Instant lastResultReceived, long sleepTimeMs, long closeTimeoutMs) { if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) { return false; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index cff6ad2ed48..d300bead9c1 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -18,8 +18,11 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -35,7 +38,7 @@ import java.util.logging.Logger; public class OperationProcessor { private static final Logger log = Logger.getLogger(OperationProcessor.class.getName()); - private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new HashMap<>(); + private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>(); private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create(); private final Set<String> inflightDocumentIds = new HashSet<>(); private final int numDestinations; @@ -103,6 +106,15 @@ public class OperationProcessor { } } + /** Returns the id of the oldest operation to be sent. */ + public Optional<String> oldestIncompleteResultId() { + synchronized (monitor) { + return docSendInfoByOperationId.isEmpty() + ? Optional.empty() + : Optional.of(docSendInfoByOperationId.keySet().iterator().next()); + } + } + public String getClientId() { return clientId; } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java index 2eb4d243ee3..b8470fd489f 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java @@ -17,16 +17,12 @@ public class FeedClientImplTest { @Test public void testCloseWaitTimeOldTimestamp() { - assertThat(FeedClientImpl.waitForOperations(Instant.now().minusSeconds(1000), 1, sleepValueMillis, 10), is(false)); + assertThat(FeedClientImpl.waitForOperations(Instant.now().minusSeconds(1000), sleepValueMillis, 10), is(false)); } @Test public void testCloseWaitTimeOutInFutureStillOperations() { - assertThat(FeedClientImpl.waitForOperations(Instant.now(), 1, sleepValueMillis, 2000), is(true)); + assertThat(FeedClientImpl.waitForOperations(Instant.now(), sleepValueMillis, 2000), is(true)); } - @Test - public void testCloseWaitZeroOperations() { - assertThat(FeedClientImpl.waitForOperations(Instant.now(), 0, sleepValueMillis, 2000), is(false)); - } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java index 774a516cc14..3a335aa5da0 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java @@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult; import org.junit.Test; import java.util.ArrayDeque; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -19,6 +20,7 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; @@ -173,12 +175,15 @@ public class OperationProcessorTest { assertThat(queue.size(), is(0)); // Only one operations should be in flight. assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId()))); operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0); assertThat(queue.size(), is(1)); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1b.getOperationId()))); operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0); assertThat(queue.size(), is(2)); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.empty())); // This should have no effect. operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0); operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0); @@ -239,18 +244,24 @@ public class OperationProcessorTest { assertThat(queue.size(), is(0)); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(3)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId()))); // This should have no effect since it should not be sent. operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(endpoint)), 0); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(3)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId()))); operationProcessor.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(endpoint)), 0); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(2)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId()))); operationProcessor.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(endpoint)), 0); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId()))); operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(endpoint)), 0); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1b.getOperationId()))); operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(endpoint)), 0); assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0)); + assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.empty())); } @Test |