aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-05-09 12:33:44 +0200
committerGitHub <noreply@github.com>2017-05-09 12:33:44 +0200
commit795969419adfa7e34cda2ba8c959502f43324b09 (patch)
tree073374eeb22a0dba8b05db1b839ac6ea58552514
parent4f36cf1ae65ceaac515c7a3828fef4bd093ddff7 (diff)
Treat document V1 API visiting timeouts with progress as successful (#2401)
Previously, using a visitor with a selection that did not match any buckets visited during the session's lifetime would trigger a timeout error to the client and abort the visiting. With this change, we special case timeouts if they have successfully visited at least 1 bucket and return a successful response for these. The client can then use the updated continuation token for its subsequent request and continue from where the timed out session left off. The timeout special cased handling is done _outside_ the session and control handler to avoid increasing session-internal complexity, and since not all visitor use cases want this behavior (e.g. streaming search).
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java64
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java11
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java39
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java35
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java93
6 files changed, 217 insertions, 27 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
index b46308e0daf..4d05125f417 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
@@ -3,13 +3,15 @@ package com.yahoo.documentapi;
import com.yahoo.vdslib.VisitorStatistics;
+import java.time.Duration;
+
/**
* A class for controlling a visitor supplied through visitor parameters when
* creating the visitor session. The class defines callbacks for reporting
* progress and that the visitor is done. If you want to reimplement the
* default behavior of those callbacks, you can write your own subclass.
*
- * @author <a href="mailto:humbe@yahoo-inc.com">H&aring;kon Humberset</a>
+ * @author Håkon Humberset
*/
public class VisitorControlHandler {
/** Possible completion codes for visiting. */
@@ -46,6 +48,14 @@ public class VisitorControlHandler {
return "Unknown error";
}
+
+ public CompletionCode getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
};
private VisitorControlSession session;
@@ -94,6 +104,17 @@ public class VisitorControlHandler {
}
/**
+ * Returns true iff the statistics reported by the visiting session indicates at least one
+ * bucket has been completely visited.
+ *
+ * Not thread safe, so should only be called on a quiescent session after waitUntilDone
+ * has completed successfully.
+ */
+ public boolean hasVisitedAnyBuckets() {
+ return ((currentStatistics != null) && (currentStatistics.getBucketsVisited() > 0));
+ }
+
+ /**
* Callback called when the visitor is done.
*
* @param code the completion code
@@ -130,25 +151,52 @@ public class VisitorControlHandler {
* Waits until visiting is done, or the given timeout (in ms) expires.
* Will wait forever if timeout is 0.
*
- * @param timeoutMs The maximum amount of milliseconds to wait.
- * @return True if visiting is done (either by error or success).
- * @throws InterruptedException If an interrupt signal was received while waiting.
+ * @param timeout Maximum time duration to wait before returning.
+ * @return True if visiting is done (either by error or success), false if session has timed out.
+ * @throws InterruptedException If an interrupt signal was received while waiting.
*/
- public boolean waitUntilDone(long timeoutMs) throws InterruptedException {
+ public boolean waitUntilDone(Duration timeout) throws InterruptedException {
synchronized (this) {
- if (completed) return true;
- if (timeoutMs == 0) {
+ if (completed) {
+ return true;
+ }
+ if (timeout.isZero()) {
while (!completed) {
wait();
}
} else {
- wait(timeoutMs);
+ wait(timeout.toMillis());
}
return completed;
}
}
/**
+ * Waits until visiting is done, or the given timeout (in ms) expires.
+ * Will wait forever if timeout is 0.
+ *
+ * @param timeoutMs The maximum amount of milliseconds to wait.
+ * @return True if visiting is done (either by error or success), false if session has timed out.
+ * @throws InterruptedException If an interrupt signal was received while waiting.
+ *
+ * TODO deprecate this in favor of waitUntilDone(Duration)
+ */
+ public boolean waitUntilDone(long timeoutMs) throws InterruptedException {
+ return waitUntilDone(Duration.ofMillis(timeoutMs));
+ }
+
+ /**
+ * Waits until visiting is done. Session timeout implicitly completes
+ * the visitor session, but will set an unsuccessful result code.
+ *
+ * @throws InterruptedException If an interrupt signal was received while waiting.
+ */
+ public void waitUntilDone() throws InterruptedException {
+ final boolean done = waitUntilDone(Duration.ZERO);
+ assert done : "Infinite waitUntilDone timeout should always complete";
+ }
+
+ /**
* Abort this visitor
*/
public void abort() { session.abort(); }
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
index 4296407d633..4218417437c 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
@@ -10,7 +10,7 @@ package com.yahoo.documentapi;
* kinds of visitor sessions, such as acking visitor data and aborting the
* session.
*
- * @author <a href="mailto:humbe@yahoo-inc.com">H&aring;kon Humberset</a>
+ * @author Håkon Humberset
*/
public interface VisitorControlSession {
/**
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 306103f2912..56edc7eb42a 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -883,8 +883,7 @@ public class MessageBusVisitorSession implements VisitorSession {
if (isFatalError(reply)) {
if (params.skipBucketsOnFatalErrors()) {
- progress.getToken().addFailedBucket(bucket, subProgress, message);
- progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET);
+ markBucketProgressAsFailed(bucket, subProgress, message);
} else {
reportVisitorError(message);
transitionTo(new StateDescription(State.FAILED, message));
@@ -903,6 +902,11 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
+ private void markBucketProgressAsFailed(BucketId bucket, BucketId subProgress, String message) {
+ progress.getToken().addFailedBucket(bucket, subProgress, message);
+ progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET);
+ }
+
private boolean enoughHitsReceived() {
if (params.getMaxFirstPassHits() != -1
&& statistics.getDocumentsReturned() >= params.getMaxFirstPassHits())
@@ -1024,7 +1028,6 @@ public class MessageBusVisitorSession implements VisitorSession {
private void handleWrongDistributionReply(WrongDistributionReply reply) {
try {
- // Classnames clash with documentapi classes, so be explicit
ClusterState newState = new ClusterState(reply.getSystemState());
int stateBits = newState.getDistributionBitCount();
if (stateBits != progress.getIterator().getDistributionBitCount()) {
@@ -1123,7 +1126,7 @@ public class MessageBusVisitorSession implements VisitorSession {
synchronized (completionMonitor) {
// If we are destroying the session before it has completed (e.g. because
// waitUntilDone timed out or an interactive visiting was interrupted)
- // set us to aborted state so that we'll seize
+ // set us to aborted state so that we'll seize sending new visitors.
if (!done) {
transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion"));
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java
new file mode 100644
index 00000000000..d8340e6a3f6
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java
@@ -0,0 +1,39 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.test;
+
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.vdslib.VisitorStatistics;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VisitorControlHandlerTest {
+
+ @Test
+ public void has_visited_any_buckets_is_false_if_no_bucket_stats_recorded() {
+ VisitorControlHandler handler = new VisitorControlHandler();
+ assertFalse(handler.hasVisitedAnyBuckets());
+ }
+
+ @Test
+ public void has_visited_any_buckets_is_false_if_zero_buckets_visited() {
+ VisitorControlHandler handler = new VisitorControlHandler();
+ VisitorStatistics stats = new VisitorStatistics();
+ stats.setBucketsVisited(0);
+ handler.onVisitorStatistics(stats);
+
+ assertFalse(handler.hasVisitedAnyBuckets());
+ }
+
+ @Test
+ public void has_visited_any_buckets_is_true_if_more_than_zero_buckets_visited() {
+ VisitorControlHandler handler = new VisitorControlHandler();
+ VisitorStatistics stats = new VisitorStatistics();
+ stats.setBucketsVisited(1);
+ handler.onVisitorStatistics(stats);
+
+ assertTrue(handler.hasVisitedAnyBuckets());
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
index aed2e2674cb..9adb0a09016 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
@@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
/**
* Sends operations to messagebus via document api.
@@ -39,8 +38,13 @@ import java.util.Set;
*/
public class OperationHandlerImpl implements OperationHandler {
+ public interface ClusterEnumerator {
+ List<ClusterDef> enumerateClusters();
+ }
+
public static final int VISIT_TIMEOUT_MS = 120000;
private final DocumentAccess documentAccess;
+ private final ClusterEnumerator clusterEnumerator;
private static final class SyncSessionFactory extends ResourceFactory<SyncSession> {
private final DocumentAccess documentAccess;
@@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler {
private final ConcurrentResourcePool<SyncSession> syncSessions;
public OperationHandlerImpl(DocumentAccess documentAccess) {
+ this(documentAccess, () -> new ClusterList("client").getStorageClusters());
+ }
+
+ public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) {
this.documentAccess = documentAccess;
+ this.clusterEnumerator = clusterEnumerator;
syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess));
}
@@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
+ private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException {
+ final VisitorControlHandler.Result result = handler.getResult();
+ if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) {
+ if (! handler.hasVisitedAnyBuckets()) {
+ throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
+ } // else: some progress has been made, let client continue with new token.
+ } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) {
+ throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
+ }
+ }
+
private VisitResult doVisit(
VisitorControlHandler visitorControlHandler,
LocalDataVisitorHandler localDataVisitorHandler,
RestUri restUri) throws RestApiException {
try {
- if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) {
- throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
- }
- if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) {
- throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
- }
+ visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures.
+ throwIfFatalVisitingError(visitorControlHandler, restUri);
} catch (InterruptedException e) {
throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED));
}
@@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
- private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
- List<ClusterDef> clusters = new ClusterList("client").getStorageClusters();
+ private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
+ final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters();
return resolveClusterRoute(wantedCluster, clusters);
}
@@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler {
params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1));
params.setToTimestamp(0L);
params.setFromTimestamp(0L);
+ params.setSessionTimeoutMs(VISIT_TIMEOUT_MS);
params.visitInconsistentBuckets(true);
params.setVisitorOrdering(VisitorOrdering.ASCENDING);
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
index 8b7276441f2..e022fa70e31 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
@@ -1,18 +1,30 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vespaclient.ClusterDef;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OperationHandlerImplTest {
@@ -63,9 +75,7 @@ public class OperationHandlerImplTest {
try {
OperationHandlerImpl.resolveClusterRoute(Optional.of("wrong"), clusterDef);
} catch(RestApiException e) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- e.getResponse().render(stream);
- String errorMsg = new String( stream.toByteArray());
+ String errorMsg = renderRestApiExceptionAsString(e);
assertThat(errorMsg, is("{\"errors\":[{\"description\":" +
"\"MISSING_CLUSTER Your vespa cluster contains the content clusters foo2 (configId2), foo (configId)," +
" foo3 (configId2), not wrong. Please select a valid vespa cluster.\",\"id\":-9}]}"));
@@ -73,4 +83,77 @@ public class OperationHandlerImplTest {
}
fail("Expected exception");
}
-} \ No newline at end of file
+
+ private String renderRestApiExceptionAsString(RestApiException e) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ e.getResponse().render(stream);
+ return new String( stream.toByteArray());
+ }
+
+ private class OperationHandlerImplFixture {
+ DocumentAccess documentAccess = mock(DocumentAccess.class);
+ AtomicReference<VisitorParameters> assignedParameters = new AtomicReference<>();
+ VisitorControlHandler.CompletionCode completionCode = VisitorControlHandler.CompletionCode.SUCCESS;
+ int bucketsVisited = 0;
+
+ OperationHandlerImpl createHandler() throws Exception {
+ VisitorSession visitorSession = mock(VisitorSession.class);
+ // Pre-bake an already completed session
+ when(documentAccess.createVisitorSession(any(VisitorParameters.class))).thenAnswer(p -> {
+ VisitorParameters params = (VisitorParameters)p.getArguments()[0];
+ assignedParameters.set(params);
+
+ VisitorStatistics statistics = new VisitorStatistics();
+ statistics.setBucketsVisited(bucketsVisited);
+ params.getControlHandler().onVisitorStatistics(statistics);
+
+ ProgressToken progress = new ProgressToken();
+ params.getControlHandler().onProgress(progress);
+
+ params.getControlHandler().onDone(completionCode, "bork bork");
+ return visitorSession;
+ });
+ OperationHandlerImpl.ClusterEnumerator clusterEnumerator = () -> Arrays.asList(new ClusterDef("foo", "configId"));
+ return new OperationHandlerImpl(documentAccess, clusterEnumerator);
+ }
+ }
+
+ private static RestUri dummyVisitUri() throws Exception {
+ return new RestUri(new URI("http://localhost/document/v1/namespace/document-type/docid/"));
+ }
+
+ @Test
+ public void timeout_without_buckets_visited_throws_timeout_error() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT;
+ fixture.bucketsVisited = 0;
+ // RestApiException hides its guts internally, so cannot trivially use @Rule directly to check for error category
+ try {
+ OperationHandlerImpl handler = fixture.createHandler();
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+ } catch (RestApiException e) {
+ assertThat(e.getResponse().getStatus(), is(500));
+ assertThat(renderRestApiExceptionAsString(e), containsString("Timed out"));
+ }
+ }
+
+ @Test
+ public void timeout_with_buckets_visited_does_not_throw_timeout_error() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT;
+ fixture.bucketsVisited = 1;
+
+ OperationHandlerImpl handler = fixture.createHandler();
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+ }
+
+ @Test
+ public void handler_sets_default_visitor_session_timeout_parameter() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ OperationHandlerImpl handler = fixture.createHandler();
+
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+
+ assertThat(fixture.assignedParameters.get().getSessionTimeoutMs(), is((long)OperationHandlerImpl.VISIT_TIMEOUT_MS));
+ }
+}