diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 99 |
1 files changed, 67 insertions, 32 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index e7f74161a8e..65744815ecf 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -55,6 +55,7 @@ import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; @@ -148,10 +149,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String FIELD_SET = "fieldSet"; private static final String SELECTION = "selection"; private static final String CLUSTER = "cluster"; + private static final String DESTINATION_CLUSTER = "destinationCluster"; private static final String CONTINUATION = "continuation"; private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount"; private static final String CONCURRENCY = "concurrency"; private static final String BUCKET_SPACE = "bucketSpace"; + private static final String TIME_CHUNK = "timeChunk"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; @@ -347,7 +350,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - VisitorParameters parameters = parseParameters(request, path); + VisitorParameters parameters = parseGetParameters(request, path); return () -> { visitAndWrite(request, parameters, handler); return true; // VisitorSession has its own throttle handling. @@ -358,8 +361,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { + StorageCluster destination = resolveCluster(Optional.of(requireProperty(request, DESTINATION_CLUSTER)), clusters); VisitorParameters parameters = parseParameters(request, path); - parameters.setRemoteDataHandler(getProperty(request, ROUTE).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + ROUTE + "'"))); + parameters.setRemoteDataHandler("[Content:cluster=" + destination.name() + "]"); // Bypass indexing. + parameters.setFieldSet(AllFields.NAME); return () -> { visitWithRemote(request, parameters, handler); return true; // VisitorSession has its own throttle handling. @@ -369,19 +374,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { - if (getProperty(request, SELECTION).isEmpty()) - throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); - return new ForwardingContentChannel(in -> { enqueueAndDispatch(request, handler, () -> { - String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); - IdIdString dummyId = new IdIdString("dummy", type, "", ""); + StorageCluster cluster = resolveCluster(Optional.of(requireProperty(request, CLUSTER)), clusters); VisitorParameters parameters = parseParameters(request, path); parameters.setFieldSet(DocIdOnly.NAME); + String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); + IdIdString dummyId = new IdIdString("dummy", type, "", ""); DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); - update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection())); + update.setCondition(new TestAndSetCondition(requireProperty(request, SELECTION))); return () -> { - visitAndUpdate(request, parameters, handler, update, getProperty(request, ROUTE)); + visitAndUpdate(request, parameters, handler, update, cluster.name()); return true; // VisitorSession has its own throttle handling. }; }); @@ -389,15 +392,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { - if (getProperty(request, SELECTION).isEmpty()) - throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); - enqueueAndDispatch(request, handler, () -> { VisitorParameters parameters = parseParameters(request, path); parameters.setFieldSet(DocIdOnly.NAME); - TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection()); + TestAndSetCondition condition = new TestAndSetCondition(requireProperty(request, SELECTION)); + StorageCluster cluster = resolveCluster(Optional.of(requireProperty(request, CLUSTER)), clusters); return () -> { - visitAndDelete(request, parameters, handler, condition, getProperty(request, ROUTE)); + visitAndDelete(request, parameters, handler, condition, cluster.name()); return true; // VisitorSession has its own throttle handling. }; }); @@ -478,7 +479,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .orElse(parameters()); for (String name : names) switch (name) { case CLUSTER: - parameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).route()) + parameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).name()) .map(parameters::withRoute) .orElse(parameters); break; @@ -644,6 +645,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeStringField("message", message); } + synchronized void writeDocumentCount(long count) throws IOException { + json.writeNumberField("documentCount", count); + } + synchronized void writeDocId(DocumentId id) throws IOException { json.writeStringField("id", id.toString()); } @@ -916,10 +921,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) { + private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) { int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1)); if (wantedDocumentCount <= 0) - throw new IllegalArgumentException("wantedDocumentCount must be positive"); + throw new IllegalArgumentException("wantedDocumentCount must be positive"); int concurrency = Math.min(100, getProperty(request, CONCURRENCY, integerParser).orElse(1)); if (concurrency <= 0) @@ -929,6 +934,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (cluster.isEmpty() && path.documentType().isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + VisitorParameters parameters = parseCommonParameters(request, path, cluster); + parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); + parameters.setMaxTotalHits(wantedDocumentCount); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); + parameters.visitInconsistentBuckets(true); + parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000)); + return parameters; + } + + private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) { + disallow(request, CONCURRENCY, FIELD_SET, ROUTE, WANTED_DOCUMENT_COUNT); + requireProperty(request, SELECTION); + VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER))); + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1)); + parameters.setSessionTimeoutMs(Math.max(1, getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L))); + return parameters; + } + + private VisitorParameters parseCommonParameters(HttpRequest request, DocumentPath path, Optional<String> cluster) { VisitorParameters parameters = new VisitorParameters(Stream.of(getProperty(request, SELECTION), path.documentType(), path.namespace().map(value -> "id.namespace=='" + value + "'"), @@ -940,15 +964,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .toString()); getProperty(request, CONTINUATION).map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); - parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); - parameters.setMaxTotalHits(wantedDocumentCount); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); - parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000)); - parameters.visitInconsistentBuckets(true); parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); StorageCluster storageCluster = resolveCluster(cluster, clusters); - parameters.setRoute(storageCluster.route()); + parameters.setRoute(storageCluster.name()); parameters.setBucketSpace(resolveBucket(storageCluster, path.documentType(), List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), @@ -969,7 +988,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - TestAndSetCondition condition, Optional<String> route) { + TestAndSetCondition condition, String route) { visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { DocumentRemove remove = new DocumentRemove(id); remove.setCondition(condition); @@ -978,7 +997,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - DocumentUpdate protoUpdate, Optional<String> route) { + DocumentUpdate protoUpdate, String route) { visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { DocumentUpdate update = new DocumentUpdate(protoUpdate); update.setId(id); @@ -987,11 +1006,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { + String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { visit(request, parameters, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters() - : parameters().withRoute(route.get())) + DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { outstanding.decrementAndGet(); switch (operationResponse.outcome()) { @@ -1057,7 +1075,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { callback.onEnd(response); switch (code) { case TIMEOUT: - if ( ! hasVisitedAnyBuckets()) { + if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { response.writeMessage("No buckets visited within timeout of " + parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); response.respond(Response.Status.GATEWAY_TIMEOUT); @@ -1066,14 +1084,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { case SUCCESS: // Intentional fallthrough. case ABORTED: // Intentional fallthrough. if (error.get() == null) { - if (getProgress() != null && ! getProgress().isFinished()) - response.writeContinuation(getProgress().serializeToString()); + ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); + if (progress != null && ! progress.isFinished()) + response.writeContinuation(progress.serializeToString()); + + if (getVisitorStatistics() != null) + response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); response.respond(Response.Status.OK); break; } default: response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); + if (getVisitorStatistics() != null) + response.writeDocumentCount(getVisitorStatistics().getDocumentsReturned()); + response.respond(Response.Status.INTERNAL_SERVER_ERROR); } visitDispatcher.execute(() -> { @@ -1113,6 +1138,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------ Helpers ------------------------------------------------ + private static String requireProperty(HttpRequest request, String name) { + return getProperty(request, name) + .orElseThrow(() -> new IllegalArgumentException("Must specify '" + name + "' at '" + request.getUri().getRawPath() + "'")); + } + /** Returns the last property with the given name, if present, or throws if this is empty or blank. */ private static Optional<String> getProperty(HttpRequest request, String name) { if ( ! request.parameters().containsKey(name)) @@ -1130,6 +1160,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return getProperty(request, name).map(parser::parse); } + private static void disallow(HttpRequest request, String... properties) { + for (String property : properties) + if (request.parameters().containsKey(property)) + throw new IllegalArgumentException("May not specify '" + property + "' at '" + request.getUri().getRawPath() + "'"); + } + @FunctionalInterface interface Parser<T> extends Function<String, T> { default T parse(String value) { @@ -1177,7 +1213,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } String name() { return name; } - String route() { return "[Content:cluster=" + name() + "]"; } Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } } |