diff options
6 files changed, 217 insertions, 27 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java index b46308e0daf..4d05125f417 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java @@ -3,13 +3,15 @@ package com.yahoo.documentapi; import com.yahoo.vdslib.VisitorStatistics; +import java.time.Duration; + /** * A class for controlling a visitor supplied through visitor parameters when * creating the visitor session. The class defines callbacks for reporting * progress and that the visitor is done. If you want to reimplement the * default behavior of those callbacks, you can write your own subclass. * - * @author <a href="mailto:humbe@yahoo-inc.com">Håkon Humberset</a> + * @author Håkon Humberset */ public class VisitorControlHandler { /** Possible completion codes for visiting. */ @@ -46,6 +48,14 @@ public class VisitorControlHandler { return "Unknown error"; } + + public CompletionCode getCode() { + return code; + } + + public String getMessage() { + return message; + } }; private VisitorControlSession session; @@ -94,6 +104,17 @@ public class VisitorControlHandler { } /** + * Returns true iff the statistics reported by the visiting session indicates at least one + * bucket has been completely visited. + * + * Not thread safe, so should only be called on a quiescent session after waitUntilDone + * has completed successfully. + */ + public boolean hasVisitedAnyBuckets() { + return ((currentStatistics != null) && (currentStatistics.getBucketsVisited() > 0)); + } + + /** * Callback called when the visitor is done. * * @param code the completion code @@ -130,25 +151,52 @@ public class VisitorControlHandler { * Waits until visiting is done, or the given timeout (in ms) expires. * Will wait forever if timeout is 0. * - * @param timeoutMs The maximum amount of milliseconds to wait. - * @return True if visiting is done (either by error or success). - * @throws InterruptedException If an interrupt signal was received while waiting. + * @param timeout Maximum time duration to wait before returning. + * @return True if visiting is done (either by error or success), false if session has timed out. + * @throws InterruptedException If an interrupt signal was received while waiting. */ - public boolean waitUntilDone(long timeoutMs) throws InterruptedException { + public boolean waitUntilDone(Duration timeout) throws InterruptedException { synchronized (this) { - if (completed) return true; - if (timeoutMs == 0) { + if (completed) { + return true; + } + if (timeout.isZero()) { while (!completed) { wait(); } } else { - wait(timeoutMs); + wait(timeout.toMillis()); } return completed; } } /** + * Waits until visiting is done, or the given timeout (in ms) expires. + * Will wait forever if timeout is 0. + * + * @param timeoutMs The maximum amount of milliseconds to wait. + * @return True if visiting is done (either by error or success), false if session has timed out. + * @throws InterruptedException If an interrupt signal was received while waiting. + * + * TODO deprecate this in favor of waitUntilDone(Duration) + */ + public boolean waitUntilDone(long timeoutMs) throws InterruptedException { + return waitUntilDone(Duration.ofMillis(timeoutMs)); + } + + /** + * Waits until visiting is done. Session timeout implicitly completes + * the visitor session, but will set an unsuccessful result code. + * + * @throws InterruptedException If an interrupt signal was received while waiting. + */ + public void waitUntilDone() throws InterruptedException { + final boolean done = waitUntilDone(Duration.ZERO); + assert done : "Infinite waitUntilDone timeout should always complete"; + } + + /** * Abort this visitor */ public void abort() { session.abort(); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java index 4296407d633..4218417437c 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java @@ -10,7 +10,7 @@ package com.yahoo.documentapi; * kinds of visitor sessions, such as acking visitor data and aborting the * session. * - * @author <a href="mailto:humbe@yahoo-inc.com">Håkon Humberset</a> + * @author Håkon Humberset */ public interface VisitorControlSession { /** diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 306103f2912..56edc7eb42a 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -883,8 +883,7 @@ public class MessageBusVisitorSession implements VisitorSession { if (isFatalError(reply)) { if (params.skipBucketsOnFatalErrors()) { - progress.getToken().addFailedBucket(bucket, subProgress, message); - progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET); + markBucketProgressAsFailed(bucket, subProgress, message); } else { reportVisitorError(message); transitionTo(new StateDescription(State.FAILED, message)); @@ -903,6 +902,11 @@ public class MessageBusVisitorSession implements VisitorSession { } } + private void markBucketProgressAsFailed(BucketId bucket, BucketId subProgress, String message) { + progress.getToken().addFailedBucket(bucket, subProgress, message); + progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET); + } + private boolean enoughHitsReceived() { if (params.getMaxFirstPassHits() != -1 && statistics.getDocumentsReturned() >= params.getMaxFirstPassHits()) @@ -1024,7 +1028,6 @@ public class MessageBusVisitorSession implements VisitorSession { private void handleWrongDistributionReply(WrongDistributionReply reply) { try { - // Classnames clash with documentapi classes, so be explicit ClusterState newState = new ClusterState(reply.getSystemState()); int stateBits = newState.getDistributionBitCount(); if (stateBits != progress.getIterator().getDistributionBitCount()) { @@ -1123,7 +1126,7 @@ public class MessageBusVisitorSession implements VisitorSession { synchronized (completionMonitor) { // If we are destroying the session before it has completed (e.g. because // waitUntilDone timed out or an interactive visiting was interrupted) - // set us to aborted state so that we'll seize + // set us to aborted state so that we'll seize sending new visitors. if (!done) { transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion")); } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java new file mode 100644 index 00000000000..d8340e6a3f6 --- /dev/null +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/VisitorControlHandlerTest.java @@ -0,0 +1,39 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.test; + +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.vdslib.VisitorStatistics; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class VisitorControlHandlerTest { + + @Test + public void has_visited_any_buckets_is_false_if_no_bucket_stats_recorded() { + VisitorControlHandler handler = new VisitorControlHandler(); + assertFalse(handler.hasVisitedAnyBuckets()); + } + + @Test + public void has_visited_any_buckets_is_false_if_zero_buckets_visited() { + VisitorControlHandler handler = new VisitorControlHandler(); + VisitorStatistics stats = new VisitorStatistics(); + stats.setBucketsVisited(0); + handler.onVisitorStatistics(stats); + + assertFalse(handler.hasVisitedAnyBuckets()); + } + + @Test + public void has_visited_any_buckets_is_true_if_more_than_zero_buckets_visited() { + VisitorControlHandler handler = new VisitorControlHandler(); + VisitorStatistics stats = new VisitorStatistics(); + stats.setBucketsVisited(1); + handler.onVisitorStatistics(stats); + + assertTrue(handler.hasVisitedAnyBuckets()); + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java index aed2e2674cb..9adb0a09016 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java @@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; -import java.util.Set; /** * Sends operations to messagebus via document api. @@ -39,8 +38,13 @@ import java.util.Set; */ public class OperationHandlerImpl implements OperationHandler { + public interface ClusterEnumerator { + List<ClusterDef> enumerateClusters(); + } + public static final int VISIT_TIMEOUT_MS = 120000; private final DocumentAccess documentAccess; + private final ClusterEnumerator clusterEnumerator; private static final class SyncSessionFactory extends ResourceFactory<SyncSession> { private final DocumentAccess documentAccess; @@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler { private final ConcurrentResourcePool<SyncSession> syncSessions; public OperationHandlerImpl(DocumentAccess documentAccess) { + this(documentAccess, () -> new ClusterList("client").getStorageClusters()); + } + + public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) { this.documentAccess = documentAccess; + this.clusterEnumerator = clusterEnumerator; syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess)); } @@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler { } } + private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException { + final VisitorControlHandler.Result result = handler.getResult(); + if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) { + if (! handler.hasVisitedAnyBuckets()) { + throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT)); + } // else: some progress has been made, let client continue with new token. + } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) { + throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR)); + } + } + private VisitResult doVisit( VisitorControlHandler visitorControlHandler, LocalDataVisitorHandler localDataVisitorHandler, RestUri restUri) throws RestApiException { try { - if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) { - throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT)); - } - if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) { - throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR)); - } + visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures. + throwIfFatalVisitingError(visitorControlHandler, restUri); } catch (InterruptedException e) { throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED)); } @@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler { } } - private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException { - List<ClusterDef> clusters = new ClusterList("client").getStorageClusters(); + private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException { + final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters(); return resolveClusterRoute(wantedCluster, clusters); } @@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler { params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1)); params.setToTimestamp(0L); params.setFromTimestamp(0L); + params.setSessionTimeoutMs(VISIT_TIMEOUT_MS); params.visitInconsistentBuckets(true); params.setVisitorOrdering(VisitorOrdering.ASCENDING); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java index 8b7276441f2..e022fa70e31 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/OperationHandlerImplTest.java @@ -1,18 +1,30 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document.restapi; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.vdslib.VisitorStatistics; import com.yahoo.vespaclient.ClusterDef; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; +import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OperationHandlerImplTest { @@ -63,9 +75,7 @@ public class OperationHandlerImplTest { try { OperationHandlerImpl.resolveClusterRoute(Optional.of("wrong"), clusterDef); } catch(RestApiException e) { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - e.getResponse().render(stream); - String errorMsg = new String( stream.toByteArray()); + String errorMsg = renderRestApiExceptionAsString(e); assertThat(errorMsg, is("{\"errors\":[{\"description\":" + "\"MISSING_CLUSTER Your vespa cluster contains the content clusters foo2 (configId2), foo (configId)," + " foo3 (configId2), not wrong. Please select a valid vespa cluster.\",\"id\":-9}]}")); @@ -73,4 +83,77 @@ public class OperationHandlerImplTest { } fail("Expected exception"); } -}
\ No newline at end of file + + private String renderRestApiExceptionAsString(RestApiException e) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + e.getResponse().render(stream); + return new String( stream.toByteArray()); + } + + private class OperationHandlerImplFixture { + DocumentAccess documentAccess = mock(DocumentAccess.class); + AtomicReference<VisitorParameters> assignedParameters = new AtomicReference<>(); + VisitorControlHandler.CompletionCode completionCode = VisitorControlHandler.CompletionCode.SUCCESS; + int bucketsVisited = 0; + + OperationHandlerImpl createHandler() throws Exception { + VisitorSession visitorSession = mock(VisitorSession.class); + // Pre-bake an already completed session + when(documentAccess.createVisitorSession(any(VisitorParameters.class))).thenAnswer(p -> { + VisitorParameters params = (VisitorParameters)p.getArguments()[0]; + assignedParameters.set(params); + + VisitorStatistics statistics = new VisitorStatistics(); + statistics.setBucketsVisited(bucketsVisited); + params.getControlHandler().onVisitorStatistics(statistics); + + ProgressToken progress = new ProgressToken(); + params.getControlHandler().onProgress(progress); + + params.getControlHandler().onDone(completionCode, "bork bork"); + return visitorSession; + }); + OperationHandlerImpl.ClusterEnumerator clusterEnumerator = () -> Arrays.asList(new ClusterDef("foo", "configId")); + return new OperationHandlerImpl(documentAccess, clusterEnumerator); + } + } + + private static RestUri dummyVisitUri() throws Exception { + return new RestUri(new URI("http://localhost/document/v1/namespace/document-type/docid/")); + } + + @Test + public void timeout_without_buckets_visited_throws_timeout_error() throws Exception { + OperationHandlerImplFixture fixture = new OperationHandlerImplFixture(); + fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT; + fixture.bucketsVisited = 0; + // RestApiException hides its guts internally, so cannot trivially use @Rule directly to check for error category + try { + OperationHandlerImpl handler = fixture.createHandler(); + handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty()); + } catch (RestApiException e) { + assertThat(e.getResponse().getStatus(), is(500)); + assertThat(renderRestApiExceptionAsString(e), containsString("Timed out")); + } + } + + @Test + public void timeout_with_buckets_visited_does_not_throw_timeout_error() throws Exception { + OperationHandlerImplFixture fixture = new OperationHandlerImplFixture(); + fixture.completionCode = VisitorControlHandler.CompletionCode.TIMEOUT; + fixture.bucketsVisited = 1; + + OperationHandlerImpl handler = fixture.createHandler(); + handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty()); + } + + @Test + public void handler_sets_default_visitor_session_timeout_parameter() throws Exception { + OperationHandlerImplFixture fixture = new OperationHandlerImplFixture(); + OperationHandlerImpl handler = fixture.createHandler(); + + handler.visit(dummyVisitUri(), "", Optional.empty(), Optional.empty()); + + assertThat(fixture.assignedParameters.get().getSessionTimeoutMs(), is((long)OperationHandlerImpl.VISIT_TIMEOUT_MS)); + } +} |