diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 120 |
1 files changed, 65 insertions, 55 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index c72bc1ef4c5..2d97c33741e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -12,7 +12,6 @@ import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentRemove; import com.yahoo.document.DocumentTypeManager; @@ -26,6 +25,7 @@ import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; import com.yahoo.document.json.JsonWriter; +import com.yahoo.document.json.ParsedDocumentOperation; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.AckToken; @@ -67,6 +67,7 @@ import com.yahoo.restapi.Path; import com.yahoo.search.query.ParameterParser; import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.http.server.Headers; import com.yahoo.vespa.http.server.MetricNames; import com.yahoo.yolean.Exceptions; import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; @@ -215,7 +216,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.operations = new ConcurrentLinkedDeque<>(); long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis(); - // TODO: Here it would be better do have dedicated threads with different wait depending on blocked or empty. + // TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty. this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); } @@ -238,7 +239,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { MILLISECONDS); Path requestPath = Path.withoutValidation(request.getUri()); // No segment validation here, as document IDs can be anything. - for (String path : handlers.keySet()) + for (String path : handlers.keySet()) { if (requestPath.matches(path)) { Map<Method, Handler> methods = handlers.get(path); if (methods.containsKey(request.getMethod())) @@ -249,6 +250,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { methodNotAllowed(request, methods.keySet(), responseHandler); } + } notFound(request, handlers.keySet(), responseHandler); } catch (IllegalArgumentException e) { @@ -398,10 +400,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { parameters.setFieldSet(DocIdOnly.NAME); String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); IdIdString dummyId = new IdIdString("dummy", type, "", ""); - DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); - update.setCondition(new TestAndSetCondition(requireProperty(request, SELECTION))); + ParsedDocumentOperation update = parser.parseUpdate(in, dummyId.toString()); + update.operation().setCondition(new TestAndSetCondition(requireProperty(request, SELECTION))); return () -> { - visitAndUpdate(request, parameters, handler, update, cluster.name()); + visitAndUpdate(request, parameters, update.fullyApplied(), handler, (DocumentUpdate)update.operation(), cluster.name()); return true; // VisitorSession has its own throttle handling. }; }); @@ -448,21 +450,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant()); if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) { - handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1)); + handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1)); return ignoredContent; } return new ForwardingContentChannel(in -> { enqueueAndDispatch(request, handler, () -> { - DocumentPut put = parser.parsePut(in, path.id().toString()); - getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); + ParsedDocumentOperation put = parser.parsePut(in, path.id().toString()); + getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(c -> put.operation().setCondition(c)); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) .withResponseHandler(response -> { outstanding.decrementAndGet(); updatePutMetrics(response.outcome()); - handleFeedOperation(path, handler, response); + handleFeedOperation(path, put.fullyApplied(), handler, response); }); - return () -> dispatchOperation(() -> asyncSession.put(put, parameters)); + return () -> dispatchOperation(() -> asyncSession.put((DocumentPut)put.operation(), parameters)); }); }); } @@ -470,20 +472,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.UPDATE, clock.instant()); if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) { - handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1)); + handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1)); return ignoredContent; } return new ForwardingContentChannel(in -> { enqueueAndDispatch(request, handler, () -> { - DocumentUpdate update = parser.parseUpdate(in, path.id().toString()); + ParsedDocumentOperation parsed = parser.parseUpdate(in, path.id().toString()); + DocumentUpdate update = (DocumentUpdate)parsed.operation(); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) .withResponseHandler(response -> { outstanding.decrementAndGet(); updateUpdateMetrics(response.outcome(), update.getCreateIfNonExistent()); - handleFeedOperation(path, handler, response); + handleFeedOperation(path, parsed.fullyApplied(), handler, response); }); return () -> dispatchOperation(() -> asyncSession.update(update, parameters)); }); @@ -493,7 +496,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant()); if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) { - handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1)); + handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1)); return ignoredContent; } @@ -504,7 +507,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .withResponseHandler(response -> { outstanding.decrementAndGet(); updateRemoveMetrics(response.outcome()); - handleFeedOperation(path, handler, response); + handleFeedOperation(path, true, handler, response); }); return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters)); }); @@ -659,10 +662,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return response; } - /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */ synchronized void commit(int status) throws IOException { + commit(status, true); + } + + /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */ + synchronized void commit(int status, boolean fullyApplied) throws IOException { Response response = new Response(status); - response.headers().addAll(Map.of("Content-Type", List.of("application/json; charset=UTF-8"))); + response.headers().add("Content-Type", List.of("application/json; charset=UTF-8")); + if (! fullyApplied) + response.headers().add(Headers.IGNORED_FIELDS, "true"); try { channel = handler.handleResponse(response); buffer.connectTo(channel); @@ -1023,15 +1032,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.manager = new DocumentTypeManager(config); } - DocumentPut parsePut(InputStream inputStream, String docId) { - return (DocumentPut) parse(inputStream, docId, DocumentOperationType.PUT); + ParsedDocumentOperation parsePut(InputStream inputStream, String docId) { + return parse(inputStream, docId, DocumentOperationType.PUT); } - DocumentUpdate parseUpdate(InputStream inputStream, String docId) { - return (DocumentUpdate) parse(inputStream, docId, DocumentOperationType.UPDATE); + ParsedDocumentOperation parseUpdate(InputStream inputStream, String docId) { + return parse(inputStream, docId, DocumentOperationType.UPDATE); } - private DocumentOperation parse(InputStream inputStream, String docId, DocumentOperationType operation) { + private ParsedDocumentOperation parse(InputStream inputStream, String docId, DocumentOperationType operation) { return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId); } @@ -1041,7 +1050,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { void onSuccess(Document document, JsonResponse response) throws IOException; } - private static void handle(DocumentPath path, HttpRequest request, ResponseHandler handler, com.yahoo.documentapi.Response response, SuccessCallback callback) { + private static void handle(DocumentPath path, + HttpRequest request, + ResponseHandler handler, + com.yahoo.documentapi.Response response, + SuccessCallback callback) { try (JsonResponse jsonResponse = JsonResponse.create(path, handler, request)) { jsonResponse.writeTrace(response.getTrace()); if (response.isSuccess()) @@ -1049,25 +1062,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { else { jsonResponse.writeMessage(response.getTextMessage()); switch (response.outcome()) { - case NOT_FOUND: - jsonResponse.commit(Response.Status.NOT_FOUND); - break; - case CONDITION_FAILED: - jsonResponse.commit(Response.Status.PRECONDITION_FAILED); - break; - case INSUFFICIENT_STORAGE: - jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE); - break; - case TIMEOUT: - jsonResponse.commit(Response.Status.GATEWAY_TIMEOUT); - break; - case ERROR: + case NOT_FOUND -> jsonResponse.commit(Response.Status.NOT_FOUND); + case CONDITION_FAILED -> jsonResponse.commit(Response.Status.PRECONDITION_FAILED); + case INSUFFICIENT_STORAGE -> jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE); + case TIMEOUT -> jsonResponse.commit(Response.Status.GATEWAY_TIMEOUT); + case ERROR -> { log.log(FINE, () -> "Exception performing document operation: " + response.getTextMessage()); jsonResponse.commit(Response.Status.BAD_GATEWAY); - break; - default: + } + default -> { log.log(WARNING, "Unexpected document API operation outcome '" + response.outcome() + "' " + response.getTextMessage()); jsonResponse.commit(Response.Status.BAD_GATEWAY); + } } } } @@ -1076,8 +1082,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } } - private static void handleFeedOperation(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response) { - handle(path, null, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK)); + private static void handleFeedOperation(DocumentPath path, + boolean fullyApplied, + ResponseHandler handler, + com.yahoo.documentapi.Response response) { + handle(path, null, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK, fullyApplied)); } private void updatePutMetrics(Outcome outcome) { @@ -1188,7 +1197,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private interface VisitCallback { /** Called at the start of response rendering. */ - default void onStart(JsonResponse response) throws IOException { } + default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { } /** Called for every document received from backend visitors—must call the ack for these to proceed. */ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } @@ -1199,25 +1208,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, TestAndSetCondition condition, String route) { - visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + visitAndProcess(request, parameters, true, handler, route, (id, operationParameters) -> { DocumentRemove remove = new DocumentRemove(id); remove.setCondition(condition); return asyncSession.remove(remove, operationParameters); }); } - private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - DocumentUpdate protoUpdate, String route) { - visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, boolean fullyApplied, + ResponseHandler handler, DocumentUpdate protoUpdate, String route) { + visitAndProcess(request, parameters, fullyApplied, handler, route, (id, operationParameters) -> { DocumentUpdate update = new DocumentUpdate(protoUpdate); update.setId(id); return asyncSession.update(update, operationParameters); }); } - private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + private void visitAndProcess(HttpRequest request, VisitorParameters parameters, boolean fullyApplied, + ResponseHandler handler, String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { - visit(request, parameters, false, handler, new VisitCallback() { + visit(request, parameters, false, fullyApplied, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { @@ -1255,10 +1265,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streamed) { - visit(request, parameters, streamed, handler, new VisitCallback() { - @Override public void onStart(JsonResponse response) throws IOException { + visit(request, parameters, streamed, true, handler, new VisitCallback() { + @Override public void onStart(JsonResponse response, boolean fullyApplied) throws IOException { if (streamed) - response.commit(Response.Status.OK); + response.commit(Response.Status.OK, fullyApplied); response.writeDocumentsArrayStart(); } @@ -1288,16 +1298,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, false, handler, new VisitCallback() { }); + visit(request, parameters, false, true, handler, new VisitCallback() { }); } @SuppressWarnings("fallthrough") - private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { + private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, boolean fullyApplied, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. - callback.onStart(response); + callback.onStart(response, fullyApplied); VisitorControlHandler controller = new VisitorControlHandler() { final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null; @Override public void onDone(CompletionCode code, String message) { @@ -1332,7 +1342,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); } if ( ! streaming) - response.commit(status); + response.commit(status, fullyApplied); } }); if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion. |