diff options
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 114 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java index aed2e2674cb..9adb0a09016 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java @@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; -import java.util.Set; /** * Sends operations to messagebus via document api. @@ -39,8 +38,13 @@ import java.util.Set; */ public class OperationHandlerImpl implements OperationHandler { + public interface ClusterEnumerator { + List<ClusterDef> enumerateClusters(); + } + public static final int VISIT_TIMEOUT_MS = 120000; private final DocumentAccess documentAccess; + private final ClusterEnumerator clusterEnumerator; private static final class SyncSessionFactory extends ResourceFactory<SyncSession> { private final DocumentAccess documentAccess; @@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler { private final ConcurrentResourcePool<SyncSession> syncSessions; public OperationHandlerImpl(DocumentAccess documentAccess) { + this(documentAccess, () -> new ClusterList("client").getStorageClusters()); + } + + public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) { this.documentAccess = documentAccess; + this.clusterEnumerator = clusterEnumerator; syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess)); } @@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler { } } + private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException { + final VisitorControlHandler.Result result = handler.getResult(); + if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) { + if (! handler.hasVisitedAnyBuckets()) { + throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT)); + } // else: some progress has been made, let client continue with new token. + } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) { + throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR)); + } + } + private VisitResult doVisit( VisitorControlHandler visitorControlHandler, LocalDataVisitorHandler localDataVisitorHandler, RestUri restUri) throws RestApiException { try { - if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) { - throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT)); - } - if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) { - throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR)); - } + visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures. + throwIfFatalVisitingError(visitorControlHandler, restUri); } catch (InterruptedException e) { throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED)); } @@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler { } } - private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException { - List<ClusterDef> clusters = new ClusterList("client").getStorageClusters(); + private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException { + final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters(); return resolveClusterRoute(wantedCluster, clusters); } @@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler { params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1)); params.setToTimestamp(0L); params.setFromTimestamp(0L); + params.setSessionTimeoutMs(VISIT_TIMEOUT_MS); params.visitInconsistentBuckets(true); params.setVisitorOrdering(VisitorOrdering.ASCENDING); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java index 8b7276441f2..e022fa70e31 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java @@ -1,18 +1,30 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document.restapi; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.vdslib.VisitorStatistics; import com.yahoo.vespaclient.ClusterDef; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; +import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OperationHandlerImplTest { @@ -63,9 +75,7 @@ public class OperationHandlerImplTest { try { OperationHandlerImpl.resolveClusterRoute(Optional.of("wrong"), clusterDef); } catch(RestApiException e) { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - e.getResponse().render(stream); - String errorMsg = new String( stream.toByteArray()); + String errorMsg = renderRestApiExceptionAsString(e); assertThat(errorMsg, is("{\"errors\":[{\"description\":" + "\"MISSING_CLUSTER Your vespa cluster contains the content clusters foo2 (configId2), foo (configId)," + " foo3 (configId2), not wrong. Please select a valid vespa cluster.\",\"id\":-9}]}")); @@ -73,4 +83,77 @@ public class OperationHandlerImplTest { } fail("Expected exception"); } -}
\ No newline at end of file + + private String renderRestApiExceptionAsString(RestApiException e) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + e.getResponse().render(stream); + return new String( stream.toByteArray()); + } + + private class OperationHandlerImplFixture { + DocumentAccess documentAccess = mock(DocumentAccess.class); + AtomicReference<VisitorParameters> assignedParameters = new AtomicReference<>(); + VisitorControlHandler.CompletionCode completionCode = VisitorControlHandler.CompletionCode.SUCCESS; + int bucketsVisited = 0; + + OperationHandlerImpl createHandler() throws Exception { + VisitorSession visitorSession = mock(VisitorSession.class); + // Pre-bake an already completed session + when(documentAccess.createVisitorSession(any(VisitorParameters.class))).thenAnswer(p -> { + VisitorParameters params = (VisitorParameters)p.getArguments()[0]; + assignedParameters.set(params); + + VisitorStatistics statistics = new VisitorStatistics(); + statistics.setBucketsVisited(bucketsVisited); + params.getControlHandler().onVisitorStatistics(statistics); + + ProgressToken progress = new ProgressToken(); + params.getControlHandler().onProgress(progress); + + params.getControlHandler().onDone(completionCode, "bork bork"); + return visitorSession; + }); + OperationHandlerImpl.ClusterEnumerator clusterEnumerator = () -> Arrays.asList(new ClusterDef("foo", "configId")); + return new OperationHandlerImpl(documentAccess, clusterEnumerator); + } + } + + private static RestUri dummyVisitUri() throws Exception { + return new RestUri(new URI("http://localhost/document/v1/namespace/document-type/docid/")); + } + + @Test + public void timeout_without_buckets_visited_throws_timeout_error() throws Exception { + OperationHandlerImplFixture fixture = new OperationHandlerImplFixture(); + fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT; + fixture.bucketsVisited = 0; + // RestApiException hides its guts internally, so cannot trivially use @Rule directly to check for error category + try { + OperationHandlerImpl handler = fixture.createHandler(); + handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty()); + } catch (RestApiException e) { + assertThat(e.getResponse().getStatus(), is(500)); + assertThat(renderRestApiExceptionAsString(e), containsString("Timed out")); + } + } + + @Test + public void timeout_with_buckets_visited_does_not_throw_timeout_error() throws Exception { + OperationHandlerImplFixture fixture = new OperationHandlerImplFixture(); + fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT; + fixture.bucketsVisited = 1; + + OperationHandlerImpl handler = fixture.createHandler(); + handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty()); + } + + @Test + public void handler_sets_default_visitor_session_timeout_parameter() throws Exception { + OperationHandlerImplFixture fixture = new OperationHandlerImplFixture(); + OperationHandlerImpl handler = fixture.createHandler(); + + handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty()); + + assertThat(fixture.assignedParameters.get().getSessionTimeoutMs(), is((long)OperationHandlerImpl.VISIT_TIMEOUT_MS)); + } +} |