diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java index aed2e2674cb..9adb0a09016 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java @@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; -import java.util.Set; /** * Sends operations to messagebus via document api. @@ -39,8 +38,13 @@ import java.util.Set; */ public class OperationHandlerImpl implements OperationHandler { + public interface ClusterEnumerator { + List<ClusterDef> enumerateClusters(); + } + public static final int VISIT_TIMEOUT_MS = 120000; private final DocumentAccess documentAccess; + private final ClusterEnumerator clusterEnumerator; private static final class SyncSessionFactory extends ResourceFactory<SyncSession> { private final DocumentAccess documentAccess; @@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler { private final ConcurrentResourcePool<SyncSession> syncSessions; public OperationHandlerImpl(DocumentAccess documentAccess) { + this(documentAccess, () -> new ClusterList("client").getStorageClusters()); + } + + public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) { this.documentAccess = documentAccess; + this.clusterEnumerator = clusterEnumerator; syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess)); } @@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler { } } + private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException { + final VisitorControlHandler.Result result = handler.getResult(); + if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) { + if (! handler.hasVisitedAnyBuckets()) { + throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT)); + } // else: some progress has been made, let client continue with new token. + } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) { + throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR)); + } + } + private VisitResult doVisit( VisitorControlHandler visitorControlHandler, LocalDataVisitorHandler localDataVisitorHandler, RestUri restUri) throws RestApiException { try { - if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) { - throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT)); - } - if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) { - throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR)); - } + visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures. + throwIfFatalVisitingError(visitorControlHandler, restUri); } catch (InterruptedException e) { throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED)); } @@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler { } } - private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException { - List<ClusterDef> clusters = new ClusterList("client").getStorageClusters(); + private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException { + final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters(); return resolveClusterRoute(wantedCluster, clusters); } @@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler { params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1)); params.setToTimestamp(0L); params.setFromTimestamp(0L); + params.setSessionTimeoutMs(VISIT_TIMEOUT_MS); params.visitInconsistentBuckets(true); params.setVisitorOrdering(VisitorOrdering.ASCENDING); |