diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 75 |
1 files changed, 53 insertions, 22 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index e6d5ea48e8f..74266fe2a6e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -46,6 +46,7 @@ import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; import com.yahoo.jdisc.Metric; @@ -178,6 +179,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String DRY_RUN = "dryRun"; private static final String FROM_TIMESTAMP = "fromTimestamp"; private static final String TO_TIMESTAMP = "toTimestamp"; + private static final String INCLUDE_REMOVES = "includeRemoves"; private final Clock clock; private final Duration visitTimeout; @@ -760,8 +762,31 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } + private interface DocumentWriter { + void write(ByteArrayOutputStream out) throws IOException; + } + /** Writes documents to an internal queue, which is flushed regularly. */ void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException { + writeDocument(myOut -> { + try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { + new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document); + } + }, completionHandler); + } + + void writeDocumentRemoval(DocumentId id, CompletionHandler completionHandler) throws IOException { + writeDocument(myOut -> { + try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { + myJson.writeStartObject(); + myJson.writeStringField("remove", id.toString()); + myJson.writeEndObject(); + } + }, completionHandler); + } + + /** Writes documents to an internal queue, which is flushed regularly. */ + void writeDocument(DocumentWriter documentWriter, CompletionHandler completionHandler) throws IOException { if (completionHandler != null) { acks.add(completionHandler); ackDocuments(); @@ -771,9 +796,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early. ByteArrayOutputStream myOut = new ByteArrayOutputStream(1); myOut.write(','); // Prepend rather than append, to avoid double memory copying. - try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { - new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document); - } + documentWriter.write(myOut); docs.add(myOut); // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled. @@ -1173,6 +1196,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(DocumentOnly.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); parameters.visitInconsistentBuckets(true); + getProperty(request, INCLUDE_REMOVES, booleanParser).ifPresent(parameters::setVisitRemoves); if (streamed) { StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); concurrency.ifPresent(throttlePolicy::setMaxPendingCount); @@ -1247,8 +1271,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Called at the start of response rendering. */ default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { } - /** Called for every document received from backend visitors—must call the ack for these to proceed. */ - default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } + /** Called for every document or removal received from backend visitors—must call the ack for these to proceed. */ + default void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) { } /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */ default void onEnd(JsonResponse response) throws IOException { } @@ -1276,7 +1300,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { ResponseHandler handler, String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { visit(request, parameters, false, fullyApplied, handler, new VisitCallback() { - @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { outstanding.decrementAndGet(); @@ -1320,18 +1344,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeDocumentsArrayStart(); } - @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) { try { - if (streamed) - response.writeDocumentValue(document, new CompletionHandler() { - @Override public void completed() { ack.run();} + if (streamed) { + CompletionHandler completion = new CompletionHandler() { + @Override public void completed() { ack.run(); } @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } - }); + }; + if (document != null) response.writeDocumentValue(document, completion); + else response.writeDocumentRemoval(removeId, completion); + } else { - response.writeDocumentValue(document, null); + if (document != null) response.writeDocumentValue(document, null); + else response.writeDocumentRemoval(removeId, null); ack.run(); } } @@ -1410,16 +1438,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (parameters.getRemoteDataHandler() == null) { parameters.setLocalDataHandler(new VisitorDataHandler() { @Override public void onMessage(Message m, AckToken token) { - if (m instanceof PutDocumentMessage) - callback.onDocument(response, - ((PutDocumentMessage) m).getDocumentPut().getDocument(), - () -> ack(token), - errorMessage -> { - error.set(errorMessage); - controller.abort(); - }); - else - throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass()); + Document document = null; + DocumentId removeId = null; + if (m instanceof PutDocumentMessage put) document = put.getDocumentPut().getDocument(); + else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) removeId = remove.getDocumentId(); + else throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName()); + callback.onDocument(response, + document, + removeId, + () -> ack(token), + errorMessage -> { + error.set(errorMessage); + controller.abort(); + }); } }); } |