summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java120
1 files changed, 65 insertions, 55 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index c72bc1ef4c5..2d97c33741e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -12,7 +12,6 @@ import com.yahoo.container.core.documentapi.VespaDocumentAccess;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentRemove;
import com.yahoo.document.DocumentTypeManager;
@@ -26,6 +25,7 @@ import com.yahoo.document.idstring.IdIdString;
import com.yahoo.document.json.DocumentOperationType;
import com.yahoo.document.json.JsonReader;
import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.json.ParsedDocumentOperation;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AckToken;
@@ -67,6 +67,7 @@ import com.yahoo.restapi.Path;
import com.yahoo.search.query.ParameterParser;
import com.yahoo.text.Text;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.http.server.Headers;
import com.yahoo.vespa.http.server.MetricNames;
import com.yahoo.yolean.Exceptions;
import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
@@ -215,7 +216,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.operations = new ConcurrentLinkedDeque<>();
long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis();
- // TODO: Here it would be better do have dedicated threads with different wait depending on blocked or empty.
+ // TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty.
this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
}
@@ -238,7 +239,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
MILLISECONDS);
Path requestPath = Path.withoutValidation(request.getUri()); // No segment validation here, as document IDs can be anything.
- for (String path : handlers.keySet())
+ for (String path : handlers.keySet()) {
if (requestPath.matches(path)) {
Map<Method, Handler> methods = handlers.get(path);
if (methods.containsKey(request.getMethod()))
@@ -249,6 +250,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
methodNotAllowed(request, methods.keySet(), responseHandler);
}
+ }
notFound(request, handlers.keySet(), responseHandler);
}
catch (IllegalArgumentException e) {
@@ -398,10 +400,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
parameters.setFieldSet(DocIdOnly.NAME);
String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates"));
IdIdString dummyId = new IdIdString("dummy", type, "", "");
- DocumentUpdate update = parser.parseUpdate(in, dummyId.toString());
- update.setCondition(new TestAndSetCondition(requireProperty(request, SELECTION)));
+ ParsedDocumentOperation update = parser.parseUpdate(in, dummyId.toString());
+ update.operation().setCondition(new TestAndSetCondition(requireProperty(request, SELECTION)));
return () -> {
- visitAndUpdate(request, parameters, handler, update, cluster.name());
+ visitAndUpdate(request, parameters, update.fullyApplied(), handler, (DocumentUpdate)update.operation(), cluster.name());
return true; // VisitorSession has its own throttle handling.
};
});
@@ -448,21 +450,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant());
if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) {
- handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1));
+ handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1));
return ignoredContent;
}
return new ForwardingContentChannel(in -> {
enqueueAndDispatch(request, handler, () -> {
- DocumentPut put = parser.parsePut(in, path.id().toString());
- getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
+ ParsedDocumentOperation put = parser.parsePut(in, path.id().toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(c -> put.operation().setCondition(c));
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
.withResponseHandler(response -> {
outstanding.decrementAndGet();
updatePutMetrics(response.outcome());
- handleFeedOperation(path, handler, response);
+ handleFeedOperation(path, put.fullyApplied(), handler, response);
});
- return () -> dispatchOperation(() -> asyncSession.put(put, parameters));
+ return () -> dispatchOperation(() -> asyncSession.put((DocumentPut)put.operation(), parameters));
});
});
}
@@ -470,20 +472,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.UPDATE, clock.instant());
if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) {
- handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1));
+ handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1));
return ignoredContent;
}
return new ForwardingContentChannel(in -> {
enqueueAndDispatch(request, handler, () -> {
- DocumentUpdate update = parser.parseUpdate(in, path.id().toString());
+ ParsedDocumentOperation parsed = parser.parseUpdate(in, path.id().toString());
+ DocumentUpdate update = (DocumentUpdate)parsed.operation();
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
.withResponseHandler(response -> {
outstanding.decrementAndGet();
updateUpdateMetrics(response.outcome(), update.getCreateIfNonExistent());
- handleFeedOperation(path, handler, response);
+ handleFeedOperation(path, parsed.fullyApplied(), handler, response);
});
return () -> dispatchOperation(() -> asyncSession.update(update, parameters));
});
@@ -493,7 +496,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant());
if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) {
- handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1));
+ handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1));
return ignoredContent;
}
@@ -504,7 +507,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
.withResponseHandler(response -> {
outstanding.decrementAndGet();
updateRemoveMetrics(response.outcome());
- handleFeedOperation(path, handler, response);
+ handleFeedOperation(path, true, handler, response);
});
return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters));
});
@@ -659,10 +662,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return response;
}
- /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */
synchronized void commit(int status) throws IOException {
+ commit(status, true);
+ }
+
+ /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */
+ synchronized void commit(int status, boolean fullyApplied) throws IOException {
Response response = new Response(status);
- response.headers().addAll(Map.of("Content-Type", List.of("application/json; charset=UTF-8")));
+ response.headers().add("Content-Type", List.of("application/json; charset=UTF-8"));
+ if (! fullyApplied)
+ response.headers().add(Headers.IGNORED_FIELDS, "true");
try {
channel = handler.handleResponse(response);
buffer.connectTo(channel);
@@ -1023,15 +1032,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.manager = new DocumentTypeManager(config);
}
- DocumentPut parsePut(InputStream inputStream, String docId) {
- return (DocumentPut) parse(inputStream, docId, DocumentOperationType.PUT);
+ ParsedDocumentOperation parsePut(InputStream inputStream, String docId) {
+ return parse(inputStream, docId, DocumentOperationType.PUT);
}
- DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
- return (DocumentUpdate) parse(inputStream, docId, DocumentOperationType.UPDATE);
+ ParsedDocumentOperation parseUpdate(InputStream inputStream, String docId) {
+ return parse(inputStream, docId, DocumentOperationType.UPDATE);
}
- private DocumentOperation parse(InputStream inputStream, String docId, DocumentOperationType operation) {
+ private ParsedDocumentOperation parse(InputStream inputStream, String docId, DocumentOperationType operation) {
return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
}
@@ -1041,7 +1050,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
void onSuccess(Document document, JsonResponse response) throws IOException;
}
- private static void handle(DocumentPath path, HttpRequest request, ResponseHandler handler, com.yahoo.documentapi.Response response, SuccessCallback callback) {
+ private static void handle(DocumentPath path,
+ HttpRequest request,
+ ResponseHandler handler,
+ com.yahoo.documentapi.Response response,
+ SuccessCallback callback) {
try (JsonResponse jsonResponse = JsonResponse.create(path, handler, request)) {
jsonResponse.writeTrace(response.getTrace());
if (response.isSuccess())
@@ -1049,25 +1062,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
else {
jsonResponse.writeMessage(response.getTextMessage());
switch (response.outcome()) {
- case NOT_FOUND:
- jsonResponse.commit(Response.Status.NOT_FOUND);
- break;
- case CONDITION_FAILED:
- jsonResponse.commit(Response.Status.PRECONDITION_FAILED);
- break;
- case INSUFFICIENT_STORAGE:
- jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE);
- break;
- case TIMEOUT:
- jsonResponse.commit(Response.Status.GATEWAY_TIMEOUT);
- break;
- case ERROR:
+ case NOT_FOUND -> jsonResponse.commit(Response.Status.NOT_FOUND);
+ case CONDITION_FAILED -> jsonResponse.commit(Response.Status.PRECONDITION_FAILED);
+ case INSUFFICIENT_STORAGE -> jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE);
+ case TIMEOUT -> jsonResponse.commit(Response.Status.GATEWAY_TIMEOUT);
+ case ERROR -> {
log.log(FINE, () -> "Exception performing document operation: " + response.getTextMessage());
jsonResponse.commit(Response.Status.BAD_GATEWAY);
- break;
- default:
+ }
+ default -> {
log.log(WARNING, "Unexpected document API operation outcome '" + response.outcome() + "' " + response.getTextMessage());
jsonResponse.commit(Response.Status.BAD_GATEWAY);
+ }
}
}
}
@@ -1076,8 +1082,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
}
- private static void handleFeedOperation(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response) {
- handle(path, null, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK));
+ private static void handleFeedOperation(DocumentPath path,
+ boolean fullyApplied,
+ ResponseHandler handler,
+ com.yahoo.documentapi.Response response) {
+ handle(path, null, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK, fullyApplied));
}
private void updatePutMetrics(Outcome outcome) {
@@ -1188,7 +1197,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private interface VisitCallback {
/** Called at the start of response rendering. */
- default void onStart(JsonResponse response) throws IOException { }
+ default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { }
/** Called for every document received from backend visitors—must call the ack for these to proceed. */
default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
@@ -1199,25 +1208,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
TestAndSetCondition condition, String route) {
- visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
+ visitAndProcess(request, parameters, true, handler, route, (id, operationParameters) -> {
DocumentRemove remove = new DocumentRemove(id);
remove.setCondition(condition);
return asyncSession.remove(remove, operationParameters);
});
}
- private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
- DocumentUpdate protoUpdate, String route) {
- visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
+ private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, boolean fullyApplied,
+ ResponseHandler handler, DocumentUpdate protoUpdate, String route) {
+ visitAndProcess(request, parameters, fullyApplied, handler, route, (id, operationParameters) -> {
DocumentUpdate update = new DocumentUpdate(protoUpdate);
update.setId(id);
return asyncSession.update(update, operationParameters);
});
}
- private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ private void visitAndProcess(HttpRequest request, VisitorParameters parameters, boolean fullyApplied,
+ ResponseHandler handler,
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
- visit(request, parameters, false, handler, new VisitCallback() {
+ visit(request, parameters, false, fullyApplied, handler, new VisitCallback() {
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
@@ -1255,10 +1265,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streamed) {
- visit(request, parameters, streamed, handler, new VisitCallback() {
- @Override public void onStart(JsonResponse response) throws IOException {
+ visit(request, parameters, streamed, true, handler, new VisitCallback() {
+ @Override public void onStart(JsonResponse response, boolean fullyApplied) throws IOException {
if (streamed)
- response.commit(Response.Status.OK);
+ response.commit(Response.Status.OK, fullyApplied);
response.writeDocumentsArrayStart();
}
@@ -1288,16 +1298,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
- visit(request, parameters, false, handler, new VisitCallback() { });
+ visit(request, parameters, false, true, handler, new VisitCallback() { });
}
@SuppressWarnings("fallthrough")
- private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) {
+ private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, boolean fullyApplied, ResponseHandler handler, VisitCallback callback) {
try {
JsonResponse response = JsonResponse.create(request, handler);
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
- callback.onStart(response);
+ callback.onStart(response, fullyApplied);
VisitorControlHandler controller = new VisitorControlHandler() {
final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null;
@Override public void onDone(CompletionCode code, String message) {
@@ -1332,7 +1342,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
}
if ( ! streaming)
- response.commit(status);
+ response.commit(status, fullyApplied);
}
});
if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion.