summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java64
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java11
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java39
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java35
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java93
6 files changed, 217 insertions, 27 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
index b46308e0daf..4d05125f417 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
@@ -3,13 +3,15 @@ package com.yahoo.documentapi;
import com.yahoo.vdslib.VisitorStatistics;
+import java.time.Duration;
+
/**
* A class for controlling a visitor supplied through visitor parameters when
* creating the visitor session. The class defines callbacks for reporting
* progress and that the visitor is done. If you want to reimplement the
* default behavior of those callbacks, you can write your own subclass.
*
- * @author <a href="mailto:humbe@yahoo-inc.com">H&aring;kon Humberset</a>
+ * @author Håkon Humberset
*/
public class VisitorControlHandler {
/** Possible completion codes for visiting. */
@@ -46,6 +48,14 @@ public class VisitorControlHandler {
return "Unknown error";
}
+
+ public CompletionCode getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
};
private VisitorControlSession session;
@@ -94,6 +104,17 @@ public class VisitorControlHandler {
}
/**
+ * Returns true iff the statistics reported by the visiting session indicates at least one
+ * bucket has been completely visited.
+ *
+ * Not thread safe, so should only be called on a quiescent session after waitUntilDone
+ * has completed successfully.
+ */
+ public boolean hasVisitedAnyBuckets() {
+ return ((currentStatistics != null) && (currentStatistics.getBucketsVisited() > 0));
+ }
+
+ /**
* Callback called when the visitor is done.
*
* @param code the completion code
@@ -130,25 +151,52 @@ public class VisitorControlHandler {
* Waits until visiting is done, or the given timeout (in ms) expires.
* Will wait forever if timeout is 0.
*
- * @param timeoutMs The maximum amount of milliseconds to wait.
- * @return True if visiting is done (either by error or success).
- * @throws InterruptedException If an interrupt signal was received while waiting.
+ * @param timeout Maximum time duration to wait before returning.
+ * @return True if visiting is done (either by error or success), false if session has timed out.
+ * @throws InterruptedException If an interrupt signal was received while waiting.
*/
- public boolean waitUntilDone(long timeoutMs) throws InterruptedException {
+ public boolean waitUntilDone(Duration timeout) throws InterruptedException {
synchronized (this) {
- if (completed) return true;
- if (timeoutMs == 0) {
+ if (completed) {
+ return true;
+ }
+ if (timeout.isZero()) {
while (!completed) {
wait();
}
} else {
- wait(timeoutMs);
+ wait(timeout.toMillis());
}
return completed;
}
}
/**
+ * Waits until visiting is done, or the given timeout (in ms) expires.
+ * Will wait forever if timeout is 0.
+ *
+ * @param timeoutMs The maximum amount of milliseconds to wait.
+ * @return True if visiting is done (either by error or success), false if session has timed out.
+ * @throws InterruptedException If an interrupt signal was received while waiting.
+ *
+ * TODO deprecate this in favor of waitUntilDone(Duration)
+ */
+ public boolean waitUntilDone(long timeoutMs) throws InterruptedException {
+ return waitUntilDone(Duration.ofMillis(timeoutMs));
+ }
+
+ /**
+ * Waits until visiting is done. Session timeout implicitly completes
+ * the visitor session, but will set an unsuccessful result code.
+ *
+ * @throws InterruptedException If an interrupt signal was received while waiting.
+ */
+ public void waitUntilDone() throws InterruptedException {
+ final boolean done = waitUntilDone(Duration.ZERO);
+ assert done : "Infinite waitUntilDone timeout should always complete";
+ }
+
+ /**
* Abort this visitor
*/
public void abort() { session.abort(); }
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
index 4296407d633..4218417437c 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
@@ -10,7 +10,7 @@ package com.yahoo.documentapi;
* kinds of visitor sessions, such as acking visitor data and aborting the
* session.
*
- * @author <a href="mailto:humbe@yahoo-inc.com">H&aring;kon Humberset</a>
+ * @author Håkon Humberset
*/
public interface VisitorControlSession {
/**
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 306103f2912..56edc7eb42a 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -883,8 +883,7 @@ public class MessageBusVisitorSession implements VisitorSession {
if (isFatalError(reply)) {
if (params.skipBucketsOnFatalErrors()) {
- progress.getToken().addFailedBucket(bucket, subProgress, message);
- progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET);
+ markBucketProgressAsFailed(bucket, subProgress, message);
} else {
reportVisitorError(message);
transitionTo(new StateDescription(State.FAILED, message));
@@ -903,6 +902,11 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
+ private void markBucketProgressAsFailed(BucketId bucket, BucketId subProgress, String message) {
+ progress.getToken().addFailedBucket(bucket, subProgress, message);
+ progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET);
+ }
+
private boolean enoughHitsReceived() {
if (params.getMaxFirstPassHits() != -1
&& statistics.getDocumentsReturned() >= params.getMaxFirstPassHits())
@@ -1024,7 +1028,6 @@ public class MessageBusVisitorSession implements VisitorSession {
private void handleWrongDistributionReply(WrongDistributionReply reply) {
try {
- // Classnames clash with documentapi classes, so be explicit
ClusterState newState = new ClusterState(reply.getSystemState());
int stateBits = newState.getDistributionBitCount();
if (stateBits != progress.getIterator().getDistributionBitCount()) {
@@ -1123,7 +1126,7 @@ public class MessageBusVisitorSession implements VisitorSession {
synchronized (completionMonitor) {
// If we are destroying the session before it has completed (e.g. because
// waitUntilDone timed out or an interactive visiting was interrupted)
- // set us to aborted state so that we'll seize
+ // set us to aborted state so that we'll seize sending new visitors.
if (!done) {
transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion"));
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java
new file mode 100644
index 00000000000..d8340e6a3f6
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java
@@ -0,0 +1,39 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.test;
+
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.vdslib.VisitorStatistics;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VisitorControlHandlerTest {
+
+ @Test
+ public void has_visited_any_buckets_is_false_if_no_bucket_stats_recorded() {
+ VisitorControlHandler handler = new VisitorControlHandler();
+ assertFalse(handler.hasVisitedAnyBuckets());
+ }
+
+ @Test
+ public void has_visited_any_buckets_is_false_if_zero_buckets_visited() {
+ VisitorControlHandler handler = new VisitorControlHandler();
+ VisitorStatistics stats = new VisitorStatistics();
+ stats.setBucketsVisited(0);
+ handler.onVisitorStatistics(stats);
+
+ assertFalse(handler.hasVisitedAnyBuckets());
+ }
+
+ @Test
+ public void has_visited_any_buckets_is_true_if_more_than_zero_buckets_visited() {
+ VisitorControlHandler handler = new VisitorControlHandler();
+ VisitorStatistics stats = new VisitorStatistics();
+ stats.setBucketsVisited(1);
+ handler.onVisitorStatistics(stats);
+
+ assertTrue(handler.hasVisitedAnyBuckets());
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
index aed2e2674cb..9adb0a09016 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
@@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
/**
* Sends operations to messagebus via document api.
@@ -39,8 +38,13 @@ import java.util.Set;
*/
public class OperationHandlerImpl implements OperationHandler {
+ public interface ClusterEnumerator {
+ List<ClusterDef> enumerateClusters();
+ }
+
public static final int VISIT_TIMEOUT_MS = 120000;
private final DocumentAccess documentAccess;
+ private final ClusterEnumerator clusterEnumerator;
private static final class SyncSessionFactory extends ResourceFactory<SyncSession> {
private final DocumentAccess documentAccess;
@@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler {
private final ConcurrentResourcePool<SyncSession> syncSessions;
public OperationHandlerImpl(DocumentAccess documentAccess) {
+ this(documentAccess, () -> new ClusterList("client").getStorageClusters());
+ }
+
+ public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) {
this.documentAccess = documentAccess;
+ this.clusterEnumerator = clusterEnumerator;
syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess));
}
@@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
+ private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException {
+ final VisitorControlHandler.Result result = handler.getResult();
+ if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) {
+ if (! handler.hasVisitedAnyBuckets()) {
+ throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
+ } // else: some progress has been made, let client continue with new token.
+ } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) {
+ throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
+ }
+ }
+
private VisitResult doVisit(
VisitorControlHandler visitorControlHandler,
LocalDataVisitorHandler localDataVisitorHandler,
RestUri restUri) throws RestApiException {
try {
- if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) {
- throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
- }
- if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) {
- throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
- }
+ visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures.
+ throwIfFatalVisitingError(visitorControlHandler, restUri);
} catch (InterruptedException e) {
throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED));
}
@@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
- private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
- List<ClusterDef> clusters = new ClusterList("client").getStorageClusters();
+ private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
+ final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters();
return resolveClusterRoute(wantedCluster, clusters);
}
@@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler {
params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1));
params.setToTimestamp(0L);
params.setFromTimestamp(0L);
+ params.setSessionTimeoutMs(VISIT_TIMEOUT_MS);
params.visitInconsistentBuckets(true);
params.setVisitorOrdering(VisitorOrdering.ASCENDING);
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
index 8b7276441f2..e022fa70e31 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
@@ -1,18 +1,30 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vespaclient.ClusterDef;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OperationHandlerImplTest {
@@ -63,9 +75,7 @@ public class OperationHandlerImplTest {
try {
OperationHandlerImpl.resolveClusterRoute(Optional.of("wrong"), clusterDef);
} catch(RestApiException e) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- e.getResponse().render(stream);
- String errorMsg = new String( stream.toByteArray());
+ String errorMsg = renderRestApiExceptionAsString(e);
assertThat(errorMsg, is("{\"errors\":[{\"description\":" +
"\"MISSING_CLUSTER Your vespa cluster contains the content clusters foo2 (configId2), foo (configId)," +
" foo3 (configId2), not wrong. Please select a valid vespa cluster.\",\"id\":-9}]}"));
@@ -73,4 +83,77 @@ public class OperationHandlerImplTest {
}
fail("Expected exception");
}
-} \ No newline at end of file
+
+ private String renderRestApiExceptionAsString(RestApiException e) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ e.getResponse().render(stream);
+ return new String( stream.toByteArray());
+ }
+
+ private class OperationHandlerImplFixture {
+ DocumentAccess documentAccess = mock(DocumentAccess.class);
+ AtomicReference<VisitorParameters> assignedParameters = new AtomicReference<>();
+ VisitorControlHandler.CompletionCode completionCode = VisitorControlHandler.CompletionCode.SUCCESS;
+ int bucketsVisited = 0;
+
+ OperationHandlerImpl createHandler() throws Exception {
+ VisitorSession visitorSession = mock(VisitorSession.class);
+ // Pre-bake an already completed session
+ when(documentAccess.createVisitorSession(any(VisitorParameters.class))).thenAnswer(p -> {
+ VisitorParameters params = (VisitorParameters)p.getArguments()[0];
+ assignedParameters.set(params);
+
+ VisitorStatistics statistics = new VisitorStatistics();
+ statistics.setBucketsVisited(bucketsVisited);
+ params.getControlHandler().onVisitorStatistics(statistics);
+
+ ProgressToken progress = new ProgressToken();
+ params.getControlHandler().onProgress(progress);
+
+ params.getControlHandler().onDone(completionCode, "bork bork");
+ return visitorSession;
+ });
+ OperationHandlerImpl.ClusterEnumerator clusterEnumerator = () -> Arrays.asList(new ClusterDef("foo", "configId"));
+ return new OperationHandlerImpl(documentAccess, clusterEnumerator);
+ }
+ }
+
+ private static RestUri dummyVisitUri() throws Exception {
+ return new RestUri(new URI("http://localhost/document/v1/namespace/document-type/docid/"));
+ }
+
+ @Test
+ public void timeout_without_buckets_visited_throws_timeout_error() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT;
+ fixture.bucketsVisited = 0;
+ // RestApiException hides its guts internally, so cannot trivially use @Rule directly to check for error category
+ try {
+ OperationHandlerImpl handler = fixture.createHandler();
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+ } catch (RestApiException e) {
+ assertThat(e.getResponse().getStatus(), is(500));
+ assertThat(renderRestApiExceptionAsString(e), containsString("Timed out"));
+ }
+ }
+
+ @Test
+ public void timeout_with_buckets_visited_does_not_throw_timeout_error() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT;
+ fixture.bucketsVisited = 1;
+
+ OperationHandlerImpl handler = fixture.createHandler();
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+ }
+
+ @Test
+ public void handler_sets_default_visitor_session_timeout_parameter() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ OperationHandlerImpl handler = fixture.createHandler();
+
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+
+ assertThat(fixture.assignedParameters.get().getSessionTimeoutMs(), is((long)OperationHandlerImpl.VISIT_TIMEOUT_MS));
+ }
+}