summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2018-10-16 21:50:20 +0200
committerGitHub <noreply@github.com>2018-10-16 21:50:20 +0200
commit2f6d6c559c310280a74f134970614ae4ee95a22f (patch)
tree4961cbc47fbc893378096d7b7628eb5cf3165628 /vespa-http-client
parent16c10f396298967a5d144518227f2e36bfa80eb2 (diff)
parent1685b5eff9084ca7da4637432f6c661b7542ddaa (diff)
Merge pull request #7328 from vespa-engine/jvenstad/fix-bug-in-feed-client-close-logic
Look at oldest send operation, rather than number of running ones
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java21
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java14
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java8
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java11
4 files changed, 35 insertions, 19 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
index 5376a96ae4b..16ef8a4bbd2 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
@@ -12,6 +12,7 @@ import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
+import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -59,15 +60,14 @@ public class FeedClientImpl implements FeedClient {
@Override
public void close() {
- Instant lastResultReceived = Instant.now();
- long lastNumberOfResults = operationProcessor.getIncompleteResultQueueSize();
+ Instant lastOldestResultReceivedAt = Instant.now();
+ Optional<String> oldestIncompleteId = operationProcessor.oldestIncompleteResultId();
- while (waitForOperations(lastResultReceived, lastNumberOfResults, sleepTimeMs, closeTimeoutMs)) {
- long results = operationProcessor.getIncompleteResultQueueSize();
- if (results != lastNumberOfResults) {
- lastResultReceived = Instant.now();
- }
- lastNumberOfResults = results;
+ while (oldestIncompleteId.isPresent() && waitForOperations(lastOldestResultReceivedAt, sleepTimeMs, closeTimeoutMs)) {
+ Optional<String> oldestIncompleteIdNow = operationProcessor.oldestIncompleteResultId();
+ if ( ! oldestIncompleteId.equals(oldestIncompleteIdNow))
+ lastOldestResultReceivedAt = Instant.now();
+ oldestIncompleteId = oldestIncompleteIdNow;
}
operationProcessor.close();
}
@@ -78,10 +78,7 @@ public class FeedClientImpl implements FeedClient {
}
// On return value true, wait more. Public for testing.
- public static boolean waitForOperations(Instant lastResultReceived, long lastNumberOfResults, long sleepTimeMs, long closeTimeoutMs) {
- if (lastNumberOfResults == 0) {
- return false;
- }
+ public static boolean waitForOperations(Instant lastResultReceived, long sleepTimeMs, long closeTimeoutMs) {
if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) {
return false;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index cff6ad2ed48..d300bead9c1 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -18,8 +18,11 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -35,7 +38,7 @@ import java.util.logging.Logger;
public class OperationProcessor {
private static final Logger log = Logger.getLogger(OperationProcessor.class.getName());
- private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new HashMap<>();
+ private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>();
private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create();
private final Set<String> inflightDocumentIds = new HashSet<>();
private final int numDestinations;
@@ -103,6 +106,15 @@ public class OperationProcessor {
}
}
+ /** Returns the id of the oldest operation to be sent. */
+ public Optional<String> oldestIncompleteResultId() {
+ synchronized (monitor) {
+ return docSendInfoByOperationId.isEmpty()
+ ? Optional.empty()
+ : Optional.of(docSendInfoByOperationId.keySet().iterator().next());
+ }
+ }
+
public String getClientId() {
return clientId;
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java
index 2eb4d243ee3..b8470fd489f 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java
@@ -17,16 +17,12 @@ public class FeedClientImplTest {
@Test
public void testCloseWaitTimeOldTimestamp() {
- assertThat(FeedClientImpl.waitForOperations(Instant.now().minusSeconds(1000), 1, sleepValueMillis, 10), is(false));
+ assertThat(FeedClientImpl.waitForOperations(Instant.now().minusSeconds(1000), sleepValueMillis, 10), is(false));
}
@Test
public void testCloseWaitTimeOutInFutureStillOperations() {
- assertThat(FeedClientImpl.waitForOperations(Instant.now(), 1, sleepValueMillis, 2000), is(true));
+ assertThat(FeedClientImpl.waitForOperations(Instant.now(), sleepValueMillis, 2000), is(true));
}
- @Test
- public void testCloseWaitZeroOperations() {
- assertThat(FeedClientImpl.waitForOperations(Instant.now(), 0, sleepValueMillis, 2000), is(false));
- }
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
index 774a516cc14..3a335aa5da0 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
@@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult;
import org.junit.Test;
import java.util.ArrayDeque;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -19,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
@@ -173,12 +175,15 @@ public class OperationProcessorTest {
assertThat(queue.size(), is(0));
// Only one operations should be in flight.
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId())));
operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
assertThat(queue.size(), is(1));
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1b.getOperationId())));
operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
assertThat(queue.size(), is(2));
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.empty()));
// This should have no effect.
operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
@@ -239,18 +244,24 @@ public class OperationProcessorTest {
assertThat(queue.size(), is(0));
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(3));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId())));
// This should have no effect since it should not be sent.
operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(endpoint)), 0);
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(3));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId())));
operationProcessor.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(endpoint)), 0);
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(2));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId())));
operationProcessor.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(endpoint)), 0);
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1.getOperationId())));
operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(endpoint)), 0);
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.of(doc1b.getOperationId())));
operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(endpoint)), 0);
assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0));
+ assertThat(operationProcessor.oldestIncompleteResultId(), is(Optional.empty()));
}
@Test