summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java99
1 files changed, 67 insertions, 32 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index e7f74161a8e..65744815ecf 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -55,6 +55,7 @@ import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
@@ -148,10 +149,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String FIELD_SET = "fieldSet";
private static final String SELECTION = "selection";
private static final String CLUSTER = "cluster";
+ private static final String DESTINATION_CLUSTER = "destinationCluster";
private static final String CONTINUATION = "continuation";
private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
private static final String CONCURRENCY = "concurrency";
private static final String BUCKET_SPACE = "bucketSpace";
+ private static final String TIME_CHUNK = "timeChunk";
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
@@ -347,7 +350,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
- VisitorParameters parameters = parseParameters(request, path);
+ VisitorParameters parameters = parseGetParameters(request, path);
return () -> {
visitAndWrite(request, parameters, handler);
return true; // VisitorSession has its own throttle handling.
@@ -358,8 +361,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
+ StorageCluster destination = resolveCluster(Optional.of(requireProperty(request, DESTINATION_CLUSTER)), clusters);
VisitorParameters parameters = parseParameters(request, path);
- parameters.setRemoteDataHandler(getProperty(request, ROUTE).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + ROUTE + "'")));
+ parameters.setRemoteDataHandler("[Content:cluster=" + destination.name() + "]"); // Bypass indexing.
+ parameters.setFieldSet(AllFields.NAME);
return () -> {
visitWithRemote(request, parameters, handler);
return true; // VisitorSession has its own throttle handling.
@@ -369,19 +374,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- if (getProperty(request, SELECTION).isEmpty())
- throw new IllegalArgumentException("Missing required property '" + SELECTION + "'");
-
return new ForwardingContentChannel(in -> {
enqueueAndDispatch(request, handler, () -> {
- String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates"));
- IdIdString dummyId = new IdIdString("dummy", type, "", "");
+ StorageCluster cluster = resolveCluster(Optional.of(requireProperty(request, CLUSTER)), clusters);
VisitorParameters parameters = parseParameters(request, path);
parameters.setFieldSet(DocIdOnly.NAME);
+ String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates"));
+ IdIdString dummyId = new IdIdString("dummy", type, "", "");
DocumentUpdate update = parser.parseUpdate(in, dummyId.toString());
- update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection()));
+ update.setCondition(new TestAndSetCondition(requireProperty(request, SELECTION)));
return () -> {
- visitAndUpdate(request, parameters, handler, update, getProperty(request, ROUTE));
+ visitAndUpdate(request, parameters, handler, update, cluster.name());
return true; // VisitorSession has its own throttle handling.
};
});
@@ -389,15 +392,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- if (getProperty(request, SELECTION).isEmpty())
- throw new IllegalArgumentException("Missing required property '" + SELECTION + "'");
-
enqueueAndDispatch(request, handler, () -> {
VisitorParameters parameters = parseParameters(request, path);
parameters.setFieldSet(DocIdOnly.NAME);
- TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection());
+ TestAndSetCondition condition = new TestAndSetCondition(requireProperty(request, SELECTION));
+ StorageCluster cluster = resolveCluster(Optional.of(requireProperty(request, CLUSTER)), clusters);
return () -> {
- visitAndDelete(request, parameters, handler, condition, getProperty(request, ROUTE));
+ visitAndDelete(request, parameters, handler, condition, cluster.name());
return true; // VisitorSession has its own throttle handling.
};
});
@@ -478,7 +479,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
.orElse(parameters());
for (String name : names) switch (name) {
case CLUSTER:
- parameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).route())
+ parameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).name())
.map(parameters::withRoute)
.orElse(parameters);
break;
@@ -644,6 +645,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeStringField("message", message);
}
+ synchronized void writeDocumentCount(long count) throws IOException {
+ json.writeNumberField("documentCount", count);
+ }
+
synchronized void writeDocId(DocumentId id) throws IOException {
json.writeStringField("id", id.toString());
}
@@ -916,10 +921,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------- Visits ------------------------------------------------
- private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) {
+ private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) {
int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1));
if (wantedDocumentCount <= 0)
- throw new IllegalArgumentException("wantedDocumentCount must be positive");
+ throw new IllegalArgumentException("wantedDocumentCount must be positive");
int concurrency = Math.min(100, getProperty(request, CONCURRENCY, integerParser).orElse(1));
if (concurrency <= 0)
@@ -929,6 +934,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (cluster.isEmpty() && path.documentType().isEmpty())
throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+ VisitorParameters parameters = parseCommonParameters(request, path, cluster);
+ parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ parameters.setMaxTotalHits(wantedDocumentCount);
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
+ parameters.visitInconsistentBuckets(true);
+ parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000));
+ return parameters;
+ }
+
+ private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) {
+ disallow(request, CONCURRENCY, FIELD_SET, ROUTE, WANTED_DOCUMENT_COUNT);
+ requireProperty(request, SELECTION);
+ VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER)));
+ parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1));
+ parameters.setSessionTimeoutMs(Math.max(1, getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L)));
+ return parameters;
+ }
+
+ private VisitorParameters parseCommonParameters(HttpRequest request, DocumentPath path, Optional<String> cluster) {
VisitorParameters parameters = new VisitorParameters(Stream.of(getProperty(request, SELECTION),
path.documentType(),
path.namespace().map(value -> "id.namespace=='" + value + "'"),
@@ -940,15 +964,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
.toString());
getProperty(request, CONTINUATION).map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
- parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
- parameters.setMaxTotalHits(wantedDocumentCount);
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
- parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000));
- parameters.visitInconsistentBuckets(true);
parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
StorageCluster storageCluster = resolveCluster(cluster, clusters);
- parameters.setRoute(storageCluster.route());
+ parameters.setRoute(storageCluster.name());
parameters.setBucketSpace(resolveBucket(storageCluster,
path.documentType(),
List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
@@ -969,7 +988,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
- TestAndSetCondition condition, Optional<String> route) {
+ TestAndSetCondition condition, String route) {
visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
DocumentRemove remove = new DocumentRemove(id);
remove.setCondition(condition);
@@ -978,7 +997,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
- DocumentUpdate protoUpdate, Optional<String> route) {
+ DocumentUpdate protoUpdate, String route) {
visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
DocumentUpdate update = new DocumentUpdate(protoUpdate);
update.setId(id);
@@ -987,11 +1006,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
- Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
+ String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
visit(request, parameters, handler, new VisitCallback() {
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters()
- : parameters().withRoute(route.get()))
+ DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
outstanding.decrementAndGet();
switch (operationResponse.outcome()) {
@@ -1057,7 +1075,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
callback.onEnd(response);
switch (code) {
case TIMEOUT:
- if ( ! hasVisitedAnyBuckets()) {
+ if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
response.writeMessage("No buckets visited within timeout of " +
parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
response.respond(Response.Status.GATEWAY_TIMEOUT);
@@ -1066,14 +1084,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
case SUCCESS: // Intentional fallthrough.
case ABORTED: // Intentional fallthrough.
if (error.get() == null) {
- if (getProgress() != null && ! getProgress().isFinished())
- response.writeContinuation(getProgress().serializeToString());
+ ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
+ if (progress != null && ! progress.isFinished())
+ response.writeContinuation(progress.serializeToString());
+
+ if (getVisitorStatistics() != null)
+ response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
response.respond(Response.Status.OK);
break;
}
default:
response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
+ if (getVisitorStatistics() != null)
+ response.writeDocumentCount(getVisitorStatistics().getDocumentsReturned());
+
response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
visitDispatcher.execute(() -> {
@@ -1113,6 +1138,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------ Helpers ------------------------------------------------
+ private static String requireProperty(HttpRequest request, String name) {
+ return getProperty(request, name)
+ .orElseThrow(() -> new IllegalArgumentException("Must specify '" + name + "' at '" + request.getUri().getRawPath() + "'"));
+ }
+
/** Returns the last property with the given name, if present, or throws if this is empty or blank. */
private static Optional<String> getProperty(HttpRequest request, String name) {
if ( ! request.parameters().containsKey(name))
@@ -1130,6 +1160,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return getProperty(request, name).map(parser::parse);
}
+ private static void disallow(HttpRequest request, String... properties) {
+ for (String property : properties)
+ if (request.parameters().containsKey(property))
+ throw new IllegalArgumentException("May not specify '" + property + "' at '" + request.getUri().getRawPath() + "'");
+ }
+
@FunctionalInterface
interface Parser<T> extends Function<String, T> {
default T parse(String value) {
@@ -1177,7 +1213,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
String name() { return name; }
- String route() { return "[Content:cluster=" + name() + "]"; }
Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
}