diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 77 |
1 files changed, 51 insertions, 26 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 1f5c3e93572..7c80db2ddc3 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -55,6 +55,7 @@ import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; @@ -148,10 +149,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String FIELD_SET = "fieldSet"; private static final String SELECTION = "selection"; private static final String CLUSTER = "cluster"; + private static final String DESTINATION_CLUSTER = "destinationCluster"; private static final String CONTINUATION = "continuation"; private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount"; private static final String CONCURRENCY = "concurrency"; private static final String BUCKET_SPACE = "bucketSpace"; + private static final String TIME_CHUNK = "timeChunk"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; @@ -347,7 +350,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - VisitorParameters parameters = parseParameters(request, path); + VisitorParameters parameters = parseGetParameters(request, path); return () -> { visitAndWrite(request, parameters, handler); return true; // VisitorSession has its own throttle handling. @@ -358,8 +361,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { + StorageCluster destination = resolveCluster(Optional.of(requireProperty(request, DESTINATION_CLUSTER)), clusters); VisitorParameters parameters = parseParameters(request, path); - parameters.setRemoteDataHandler(getProperty(request, ROUTE).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + ROUTE + "'"))); + parameters.setRemoteDataHandler("[Content:cluster=" + destination.name() + "]"); // Bypass indexing. + parameters.setFieldSet(AllFields.NAME); return () -> { visitWithRemote(request, parameters, handler); return true; // VisitorSession has its own throttle handling. @@ -369,19 +374,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { - if (getProperty(request, SELECTION).isEmpty()) - throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); - return new ForwardingContentChannel(in -> { enqueueAndDispatch(request, handler, () -> { - String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); - IdIdString dummyId = new IdIdString("dummy", type, "", ""); + StorageCluster cluster = resolveCluster(Optional.of(requireProperty(request, CLUSTER)), clusters); VisitorParameters parameters = parseParameters(request, path); parameters.setFieldSet(DocIdOnly.NAME); + String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); + IdIdString dummyId = new IdIdString("dummy", type, "", ""); DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); - update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection())); + update.setCondition(new TestAndSetCondition(requireProperty(request, SELECTION))); return () -> { - visitAndUpdate(request, parameters, handler, update, getProperty(request, ROUTE)); + visitAndUpdate(request, parameters, handler, update, cluster.name()); return true; // VisitorSession has its own throttle handling. }; }); @@ -389,15 +392,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { - if (getProperty(request, SELECTION).isEmpty()) - throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); - enqueueAndDispatch(request, handler, () -> { VisitorParameters parameters = parseParameters(request, path); parameters.setFieldSet(DocIdOnly.NAME); - TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection()); + TestAndSetCondition condition = new TestAndSetCondition(requireProperty(request, SELECTION)); + StorageCluster cluster = resolveCluster(Optional.of(requireProperty(request, CLUSTER)), clusters); return () -> { - visitAndDelete(request, parameters, handler, condition, getProperty(request, ROUTE)); + visitAndDelete(request, parameters, handler, condition, cluster.name()); return true; // VisitorSession has its own throttle handling. }; }); @@ -916,10 +917,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) { + private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) { int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1)); if (wantedDocumentCount <= 0) - throw new IllegalArgumentException("wantedDocumentCount must be positive"); + throw new IllegalArgumentException("wantedDocumentCount must be positive"); int concurrency = Math.min(100, getProperty(request, CONCURRENCY, integerParser).orElse(1)); if (concurrency <= 0) @@ -929,6 +930,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (cluster.isEmpty() && path.documentType().isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + VisitorParameters parameters = parseCommonParameters(request, path, cluster); + parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); + parameters.setMaxTotalHits(wantedDocumentCount); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); + parameters.visitInconsistentBuckets(true); + parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000)); + return parameters; + } + + private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) { + disallow(request, CONCURRENCY, FIELD_SET, ROUTE, WANTED_DOCUMENT_COUNT); + requireProperty(request, SELECTION); + VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER))); + parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1)); + parameters.setSessionTimeoutMs(Math.max(1, getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L))); + return parameters; + } + + private VisitorParameters parseCommonParameters(HttpRequest request, DocumentPath path, Optional<String> cluster) { VisitorParameters parameters = new VisitorParameters(Stream.of(getProperty(request, SELECTION), path.documentType(), path.namespace().map(value -> "id.namespace=='" + value + "'"), @@ -940,11 +960,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .toString()); getProperty(request, CONTINUATION).map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); - parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); - parameters.setMaxTotalHits(wantedDocumentCount); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); - parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000)); - parameters.visitInconsistentBuckets(true); parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); StorageCluster storageCluster = resolveCluster(cluster, clusters); @@ -969,7 +984,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - TestAndSetCondition condition, Optional<String> route) { + TestAndSetCondition condition, String route) { visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { DocumentRemove remove = new DocumentRemove(id); remove.setCondition(condition); @@ -978,7 +993,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - DocumentUpdate protoUpdate, Optional<String> route) { + DocumentUpdate protoUpdate, String route) { visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { DocumentUpdate update = new DocumentUpdate(protoUpdate); update.setId(id); @@ -987,11 +1002,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { + String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { visit(request, parameters, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters() - : parameters().withRoute(route.get())) + DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { outstanding.decrementAndGet(); switch (operationResponse.outcome()) { @@ -1113,6 +1127,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------ Helpers ------------------------------------------------ + private static String requireProperty(HttpRequest request, String name) { + return getProperty(request, name) + .orElseThrow(() -> new IllegalArgumentException("Must specify '" + name + "' at '" + request.getUri().getRawPath() + "'")); + } + /** Returns the last property with the given name, if present, or throws if this is empty or blank. */ private static Optional<String> getProperty(HttpRequest request, String name) { if ( ! request.parameters().containsKey(name)) @@ -1130,6 +1149,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return getProperty(request, name).map(parser::parse); } + private static void disallow(HttpRequest request, String... properties) { + for (String property : properties) + if (request.parameters().containsKey(property)) + throw new IllegalArgumentException("May not specify '" + property + "' at '" + request.getUri().getRawPath() + "'"); + } + @FunctionalInterface interface Parser<T> extends Function<String, T> { default T parse(String value) { |