From 4ce892bf9c569dd8bd125ca5328f959b544871cf Mon Sep 17 00:00:00 2001 From: jonmv Date: Thu, 7 Sep 2023 14:04:57 +0200 Subject: Support visiting remove operations through /document/v1 --- .../restapi/resource/DocumentV1ApiHandler.java | 75 +++++++++++++++------- .../restapi/resource/DocumentV1ApiTest.java | 22 +++++-- 2 files changed, 68 insertions(+), 29 deletions(-) (limited to 'vespaclient-container-plugin') diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index e6d5ea48e8f..74266fe2a6e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -46,6 +46,7 @@ import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; import com.yahoo.jdisc.Metric; @@ -178,6 +179,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String DRY_RUN = "dryRun"; private static final String FROM_TIMESTAMP = "fromTimestamp"; private static final String TO_TIMESTAMP = "toTimestamp"; + private static final String INCLUDE_REMOVES = "includeRemoves"; private final Clock clock; private final Duration visitTimeout; @@ -760,8 +762,31 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } + private interface DocumentWriter { + void write(ByteArrayOutputStream out) throws IOException; + } + /** Writes documents to an internal queue, which is flushed regularly. */ void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException { + writeDocument(myOut -> { + try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { + new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document); + } + }, completionHandler); + } + + void writeDocumentRemoval(DocumentId id, CompletionHandler completionHandler) throws IOException { + writeDocument(myOut -> { + try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { + myJson.writeStartObject(); + myJson.writeStringField("remove", id.toString()); + myJson.writeEndObject(); + } + }, completionHandler); + } + + /** Writes documents to an internal queue, which is flushed regularly. */ + void writeDocument(DocumentWriter documentWriter, CompletionHandler completionHandler) throws IOException { if (completionHandler != null) { acks.add(completionHandler); ackDocuments(); @@ -771,9 +796,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early. ByteArrayOutputStream myOut = new ByteArrayOutputStream(1); myOut.write(','); // Prepend rather than append, to avoid double memory copying. - try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { - new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document); - } + documentWriter.write(myOut); docs.add(myOut); // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled. @@ -1173,6 +1196,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(DocumentOnly.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); parameters.visitInconsistentBuckets(true); + getProperty(request, INCLUDE_REMOVES, booleanParser).ifPresent(parameters::setVisitRemoves); if (streamed) { StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); concurrency.ifPresent(throttlePolicy::setMaxPendingCount); @@ -1247,8 +1271,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Called at the start of response rendering. */ default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { } - /** Called for every document received from backend visitors—must call the ack for these to proceed. */ - default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer onError) { } + /** Called for every document or removal received from backend visitors—must call the ack for these to proceed. */ + default void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer onError) { } /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */ default void onEnd(JsonResponse response) throws IOException { } @@ -1276,7 +1300,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { ResponseHandler handler, String route, BiFunction operation) { visit(request, parameters, false, fullyApplied, handler, new VisitCallback() { - @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer onError) { + @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { outstanding.decrementAndGet(); @@ -1320,18 +1344,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeDocumentsArrayStart(); } - @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer onError) { + @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer onError) { try { - if (streamed) - response.writeDocumentValue(document, new CompletionHandler() { - @Override public void completed() { ack.run();} + if (streamed) { + CompletionHandler completion = new CompletionHandler() { + @Override public void completed() { ack.run(); } @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } - }); + }; + if (document != null) response.writeDocumentValue(document, completion); + else response.writeDocumentRemoval(removeId, completion); + } else { - response.writeDocumentValue(document, null); + if (document != null) response.writeDocumentValue(document, null); + else response.writeDocumentRemoval(removeId, null); ack.run(); } } @@ -1410,16 +1438,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (parameters.getRemoteDataHandler() == null) { parameters.setLocalDataHandler(new VisitorDataHandler() { @Override public void onMessage(Message m, AckToken token) { - if (m instanceof PutDocumentMessage) - callback.onDocument(response, - ((PutDocumentMessage) m).getDocumentPut().getDocument(), - () -> ack(token), - errorMessage -> { - error.set(errorMessage); - controller.abort(); - }); - else - throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass()); + Document document = null; + DocumentId removeId = null; + if (m instanceof PutDocumentMessage put) document = put.getDocumentPut().getDocument(); + else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) removeId = remove.getDocumentId(); + else throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName()); + callback.onDocument(response, + document, + removeId, + () -> ack(token), + errorMessage -> { + error.set(errorMessage); + controller.abort(); + }); } }); } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index e8f42fbecfa..a6aeab61fa2 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -43,6 +43,7 @@ import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorResponse; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.jdisc.test.MockMetric; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; @@ -190,7 +191,7 @@ public class DocumentV1ApiTest { @Test public void testResponses() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); - List tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null)); + List tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null), new AckToken(null)); // GET at non-existent path returns 404 with available paths var response = driver.sendRequest("http://localhost/document/v1/not-found"); assertSameJson(""" @@ -227,18 +228,21 @@ public class DocumentV1ApiTest { assertEquals(9, parameters.getTraceLevel()); assertEquals(1_000_000, parameters.getFromTimestamp()); assertEquals(2_000_000, parameters.getToTimestamp()); + assertTrue(parameters.visitRemoves()); // Put some documents in the response parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2)); + parameters.getLocalDataHandler().onMessage(new RemoveDocumentMessage(new DocumentId("id:space:music::t-square-truth")), tokens.get(3)); VisitorStatistics statistics = new VisitorStatistics(); statistics.setBucketsVisited(1); statistics.setDocumentsVisited(3); parameters.getControlHandler().onVisitorStatistics(statistics); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK"); }); - response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + - "&selection=all%20the%20things&fieldSet=[id]&timeout=6&tracelevel=9&fromTimestamp=1000000&toTimestamp=2000000"); + response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025" + + "&concurrency=123&selection=all%20the%20things&fieldSet=[id]&timeout=6&tracelevel=9" + + "&fromTimestamp=1000000&toTimestamp=2000000&includeRemoves=TrUe"); assertSameJson(""" { "pathId": "/document/v1", @@ -246,20 +250,23 @@ public class DocumentV1ApiTest { { "id": "id:space:music::one", "fields": { - "artist": "Tom Waits",\s - "embedding": { "type": "tensor(x[3])", "values": [1.0,2.0,3.0] }\s + "artist": "Tom Waits", + "embedding": { "type": "tensor(x[3])", "values": [1.0,2.0,3.0] } } }, { "id": "id:space:music:n=1:two", "fields": { - "artist": "Asa-Chan & Jun-Ray",\s - "embedding": { "type": "tensor(x[3])", "values": [4.0,5.0,6.0] }\s + "artist": "Asa-Chan & Jun-Ray", + "embedding": { "type": "tensor(x[3])", "values": [4.0,5.0,6.0] } } }, { "id": "id:space:music:g=a:three", "fields": {} + }, + { + "remove": "id:space:music::t-square-truth" } ], "documentCount": 3, @@ -290,6 +297,7 @@ public class DocumentV1ApiTest { assertEquals(1, parameters.getSliceId()); assertEquals(0, parameters.getFromTimestamp()); // not set; 0 is default assertEquals(0, parameters.getToTimestamp()); // not set; 0 is default + assertFalse(parameters.visitRemoves()); // false by default // Put some documents in the response parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); -- cgit v1.2.3