summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java35
1 files changed, 26 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
index aed2e2674cb..9adb0a09016 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/OperationHandlerImpl.java
@@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
/**
* Sends operations to messagebus via document api.
@@ -39,8 +38,13 @@ import java.util.Set;
*/
public class OperationHandlerImpl implements OperationHandler {
+ public interface ClusterEnumerator {
+ List<ClusterDef> enumerateClusters();
+ }
+
public static final int VISIT_TIMEOUT_MS = 120000;
private final DocumentAccess documentAccess;
+ private final ClusterEnumerator clusterEnumerator;
private static final class SyncSessionFactory extends ResourceFactory<SyncSession> {
private final DocumentAccess documentAccess;
@@ -56,7 +60,12 @@ public class OperationHandlerImpl implements OperationHandler {
private final ConcurrentResourcePool<SyncSession> syncSessions;
public OperationHandlerImpl(DocumentAccess documentAccess) {
+ this(documentAccess, () -> new ClusterList("client").getStorageClusters());
+ }
+
+ public OperationHandlerImpl(DocumentAccess documentAccess, ClusterEnumerator clusterEnumerator) {
this.documentAccess = documentAccess;
+ this.clusterEnumerator = clusterEnumerator;
syncSessions = new ConcurrentResourcePool<>(new SyncSessionFactory(documentAccess));
}
@@ -124,17 +133,24 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
+ private static void throwIfFatalVisitingError(VisitorControlHandler handler, RestUri restUri) throws RestApiException {
+ final VisitorControlHandler.Result result = handler.getResult();
+ if (result.getCode() == VisitorControlHandler.CompletionCode.TIMEOUT) {
+ if (! handler.hasVisitedAnyBuckets()) {
+ throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
+ } // else: some progress has been made, let client continue with new token.
+ } else if (result.getCode() != VisitorControlHandler.CompletionCode.SUCCESS) {
+ throw new RestApiException(Response.createErrorResponse(400, result.toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
+ }
+ }
+
private VisitResult doVisit(
VisitorControlHandler visitorControlHandler,
LocalDataVisitorHandler localDataVisitorHandler,
RestUri restUri) throws RestApiException {
try {
- if (! visitorControlHandler.waitUntilDone(VISIT_TIMEOUT_MS)) {
- throw new RestApiException(Response.createErrorResponse(500, "Timed out", restUri, RestUri.apiErrorCodes.TIME_OUT));
- }
- if (visitorControlHandler.getResult().code != VisitorControlHandler.CompletionCode.SUCCESS) {
- throw new RestApiException(Response.createErrorResponse(400, visitorControlHandler.getResult().toString(), RestUri.apiErrorCodes.VISITOR_ERROR));
- }
+ visitorControlHandler.waitUntilDone(); // VisitorParameters' session timeout implicitly triggers timeout failures.
+ throwIfFatalVisitingError(visitorControlHandler, restUri);
} catch (InterruptedException e) {
throw new RestApiException(Response.createErrorResponse(500, ExceptionUtils.getStackTrace(e), restUri, RestUri.apiErrorCodes.INTERRUPTED));
}
@@ -238,8 +254,8 @@ public class OperationHandlerImpl implements OperationHandler {
}
}
- private static String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
- List<ClusterDef> clusters = new ClusterList("client").getStorageClusters();
+ private String resolveClusterRoute(Optional<String> wantedCluster) throws RestApiException {
+ final List<ClusterDef> clusters = clusterEnumerator.enumerateClusters();
return resolveClusterRoute(wantedCluster, clusters);
}
@@ -304,6 +320,7 @@ public class OperationHandlerImpl implements OperationHandler {
params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1));
params.setToTimestamp(0L);
params.setFromTimestamp(0L);
+ params.setSessionTimeoutMs(VISIT_TIMEOUT_MS);
params.visitInconsistentBuckets(true);
params.setVisitorOrdering(VisitorOrdering.ASCENDING);