summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java35
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java93
2 files changed, 114 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
index aed2e2674cb..9adb0a09016 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
@@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
/**
* Sends operations to messagebus via document api.
@@ -39,8 +38,13 @@ import java.util.Set;
*/
public class OperationHandlerImpl implements OperationHandler {
+ public interface ClusterEnumerator {
+ List<ClusterDef> enumerateClusters();
+ }
+
public static final int VISIT_TIMEOUT_MS = 120000;
private final DocumentAccess documentAccess;
+ private final ClusterEnumerator clusterEnumerator;
private static final class SyncSessionFactory extends ResourceFactory<SyncSession> {
private final DocumentAccess documentAccess;
@@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler {
private final ConcurrentResourcePool<SyncSession> syncSessions;
public OperationHandlerImpl(DocumentAccess documentAccess) {
+ this(documentAccess, () -> new ClusterList("client").getStorageClusters());
+ }
+
+ public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) {
this.documentAccess = documentAccess;
+ this.clusterEnumerator = clusterEnumerator;
syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess));
}
@@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
+ private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException {
+ final VisitorControlHandler.Result result = handler.getResult();
+ if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) {
+ if (! handler.hasVisitedAnyBuckets()) {
+ throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
+ } // else: some progress has been made, let client continue with new token.
+ } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) {
+ throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
+ }
+ }
+
private VisitResult doVisit(
VisitorControlHandler visitorControlHandler,
LocalDataVisitorHandler localDataVisitorHandler,
RestUri restUri) throws RestApiException {
try {
- if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) {
- throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
- }
- if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) {
- throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
- }
+ visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures.
+ throwIfFatalVisitingError(visitorControlHandler, restUri);
} catch (InterruptedException e) {
throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED));
}
@@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
- private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
- List<ClusterDef> clusters = new ClusterList("client").getStorageClusters();
+ private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
+ final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters();
return resolveClusterRoute(wantedCluster, clusters);
}
@@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler {
params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1));
params.setToTimestamp(0L);
params.setFromTimestamp(0L);
+ params.setSessionTimeoutMs(VISIT_TIMEOUT_MS);
params.visitInconsistentBuckets(true);
params.setVisitorOrdering(VisitorOrdering.ASCENDING);
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
index 8b7276441f2..e022fa70e31 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java
@@ -1,18 +1,30 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vespaclient.ClusterDef;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OperationHandlerImplTest {
@@ -63,9 +75,7 @@ public class OperationHandlerImplTest {
try {
OperationHandlerImpl.resolveClusterRoute(Optional.of("wrong"), clusterDef);
} catch(RestApiException e) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- e.getResponse().render(stream);
- String errorMsg = new String( stream.toByteArray());
+ String errorMsg = renderRestApiExceptionAsString(e);
assertThat(errorMsg, is("{\"errors\":[{\"description\":" +
"\"MISSING_CLUSTER Your vespa cluster contains the content clusters foo2 (configId2), foo (configId)," +
" foo3 (configId2), not wrong. Please select a valid vespa cluster.\",\"id\":-9}]}"));
@@ -73,4 +83,77 @@ public class OperationHandlerImplTest {
}
fail("Expected exception");
}
-} \ No newline at end of file
+
+ private String renderRestApiExceptionAsString(RestApiException e) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ e.getResponse().render(stream);
+ return new String( stream.toByteArray());
+ }
+
+ private class OperationHandlerImplFixture {
+ DocumentAccess documentAccess = mock(DocumentAccess.class);
+ AtomicReference<VisitorParameters> assignedParameters = new AtomicReference<>();
+ VisitorControlHandler.CompletionCode completionCode = VisitorControlHandler.CompletionCode.SUCCESS;
+ int bucketsVisited = 0;
+
+ OperationHandlerImpl createHandler() throws Exception {
+ VisitorSession visitorSession = mock(VisitorSession.class);
+ // Pre-bake an already completed session
+ when(documentAccess.createVisitorSession(any(VisitorParameters.class))).thenAnswer(p -> {
+ VisitorParameters params = (VisitorParameters)p.getArguments()[0];
+ assignedParameters.set(params);
+
+ VisitorStatistics statistics = new VisitorStatistics();
+ statistics.setBucketsVisited(bucketsVisited);
+ params.getControlHandler().onVisitorStatistics(statistics);
+
+ ProgressToken progress = new ProgressToken();
+ params.getControlHandler().onProgress(progress);
+
+ params.getControlHandler().onDone(completionCode, "bork bork");
+ return visitorSession;
+ });
+ OperationHandlerImpl.ClusterEnumerator clusterEnumerator = () -> Arrays.asList(new ClusterDef("foo", "configId"));
+ return new OperationHandlerImpl(documentAccess, clusterEnumerator);
+ }
+ }
+
+ private static RestUri dummyVisitUri() throws Exception {
+ return new RestUri(new URI("http://localhost/document/v1/namespace/document-type/docid/"));
+ }
+
+ @Test
+ public void timeout_without_buckets_visited_throws_timeout_error() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT;
+ fixture.bucketsVisited = 0;
+ // RestApiException hides its guts internally, so cannot trivially use @Rule directly to check for error category
+ try {
+ OperationHandlerImpl handler = fixture.createHandler();
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+ } catch (RestApiException e) {
+ assertThat(e.getResponse().getStatus(), is(500));
+ assertThat(renderRestApiExceptionAsString(e), containsString("Timed out"));
+ }
+ }
+
+ @Test
+ public void timeout_with_buckets_visited_does_not_throw_timeout_error() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT;
+ fixture.bucketsVisited = 1;
+
+ OperationHandlerImpl handler = fixture.createHandler();
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+ }
+
+ @Test
+ public void handler_sets_default_visitor_session_timeout_parameter() throws Exception {
+ OperationHandlerImplFixture fixture = new OperationHandlerImplFixture();
+ OperationHandlerImpl handler = fixture.createHandler();
+
+ handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty());
+
+ assertThat(fixture.assignedParameters.get().getSessionTimeoutMs(), is((long)OperationHandlerImpl.VISIT_TIMEOUT_MS));
+ }
+}