summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-09-07 14:04:57 +0200
committerjonmv <venstad@gmail.com>2023-09-07 14:04:57 +0200
commit4ce892bf9c569dd8bd125ca5328f959b544871cf (patch)
tree8d9eece5274577f15de5830f4a951cc6dbaae3d7 /vespaclient-container-plugin
parent3e0aa989cf77ba5459dfd66cb65507c3bb37a7e2 (diff)
Support visiting remove operations through /document/v1
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java75
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java22
2 files changed, 68 insertions, 29 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index e6d5ea48e8f..74266fe2a6e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -46,6 +46,7 @@ import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
import com.yahoo.jdisc.Metric;
@@ -178,6 +179,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String DRY_RUN = "dryRun";
private static final String FROM_TIMESTAMP = "fromTimestamp";
private static final String TO_TIMESTAMP = "toTimestamp";
+ private static final String INCLUDE_REMOVES = "includeRemoves";
private final Clock clock;
private final Duration visitTimeout;
@@ -760,8 +762,31 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
+ private interface DocumentWriter {
+ void write(ByteArrayOutputStream out) throws IOException;
+ }
+
/** Writes documents to an internal queue, which is flushed regularly. */
void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException {
+ writeDocument(myOut -> {
+ try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
+ new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document);
+ }
+ }, completionHandler);
+ }
+
+ void writeDocumentRemoval(DocumentId id, CompletionHandler completionHandler) throws IOException {
+ writeDocument(myOut -> {
+ try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
+ myJson.writeStartObject();
+ myJson.writeStringField("remove", id.toString());
+ myJson.writeEndObject();
+ }
+ }, completionHandler);
+ }
+
+ /** Writes documents to an internal queue, which is flushed regularly. */
+ void writeDocument(DocumentWriter documentWriter, CompletionHandler completionHandler) throws IOException {
if (completionHandler != null) {
acks.add(completionHandler);
ackDocuments();
@@ -771,9 +796,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early.
ByteArrayOutputStream myOut = new ByteArrayOutputStream(1);
myOut.write(','); // Prepend rather than append, to avoid double memory copying.
- try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
- new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document);
- }
+ documentWriter.write(myOut);
docs.add(myOut);
// Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled.
@@ -1173,6 +1196,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(DocumentOnly.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
parameters.visitInconsistentBuckets(true);
+ getProperty(request, INCLUDE_REMOVES, booleanParser).ifPresent(parameters::setVisitRemoves);
if (streamed) {
StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
@@ -1247,8 +1271,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Called at the start of response rendering. */
default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { }
- /** Called for every document received from backend visitors—must call the ack for these to proceed. */
- default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
+ /** Called for every document or removal received from backend visitors—must call the ack for these to proceed. */
+ default void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) { }
/** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
default void onEnd(JsonResponse response) throws IOException { }
@@ -1276,7 +1300,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
ResponseHandler handler,
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
visit(request, parameters, false, fullyApplied, handler, new VisitCallback() {
- @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
outstanding.decrementAndGet();
@@ -1320,18 +1344,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeDocumentsArrayStart();
}
- @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) {
try {
- if (streamed)
- response.writeDocumentValue(document, new CompletionHandler() {
- @Override public void completed() { ack.run();}
+ if (streamed) {
+ CompletionHandler completion = new CompletionHandler() {
+ @Override public void completed() { ack.run(); }
@Override public void failed(Throwable t) {
ack.run();
onError.accept(t.getMessage());
}
- });
+ };
+ if (document != null) response.writeDocumentValue(document, completion);
+ else response.writeDocumentRemoval(removeId, completion);
+ }
else {
- response.writeDocumentValue(document, null);
+ if (document != null) response.writeDocumentValue(document, null);
+ else response.writeDocumentRemoval(removeId, null);
ack.run();
}
}
@@ -1410,16 +1438,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (parameters.getRemoteDataHandler() == null) {
parameters.setLocalDataHandler(new VisitorDataHandler() {
@Override public void onMessage(Message m, AckToken token) {
- if (m instanceof PutDocumentMessage)
- callback.onDocument(response,
- ((PutDocumentMessage) m).getDocumentPut().getDocument(),
- () -> ack(token),
- errorMessage -> {
- error.set(errorMessage);
- controller.abort();
- });
- else
- throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass());
+ Document document = null;
+ DocumentId removeId = null;
+ if (m instanceof PutDocumentMessage put) document = put.getDocumentPut().getDocument();
+ else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) removeId = remove.getDocumentId();
+ else throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName());
+ callback.onDocument(response,
+ document,
+ removeId,
+ () -> ack(token),
+ errorMessage -> {
+ error.set(errorMessage);
+ controller.abort();
+ });
}
});
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index e8f42fbecfa..a6aeab61fa2 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -43,6 +43,7 @@ import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.jdisc.test.MockMetric;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
@@ -190,7 +191,7 @@ public class DocumentV1ApiTest {
@Test
public void testResponses() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
- List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null));
+ List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null), new AckToken(null));
// GET at non-existent path returns 404 with available paths
var response = driver.sendRequest("http://localhost/document/v1/not-found");
assertSameJson("""
@@ -227,18 +228,21 @@ public class DocumentV1ApiTest {
assertEquals(9, parameters.getTraceLevel());
assertEquals(1_000_000, parameters.getFromTimestamp());
assertEquals(2_000_000, parameters.getToTimestamp());
+ assertTrue(parameters.visitRemoves());
// Put some documents in the response
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2));
+ parameters.getLocalDataHandler().onMessage(new RemoveDocumentMessage(new DocumentId("id:space:music::t-square-truth")), tokens.get(3));
VisitorStatistics statistics = new VisitorStatistics();
statistics.setBucketsVisited(1);
statistics.setDocumentsVisited(3);
parameters.getControlHandler().onVisitorStatistics(statistics);
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK");
});
- response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
- "&selection=all%20the%20things&fieldSet=[id]&timeout=6&tracelevel=9&fromTimestamp=1000000&toTimestamp=2000000");
+ response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025" +
+ "&concurrency=123&selection=all%20the%20things&fieldSet=[id]&timeout=6&tracelevel=9" +
+ "&fromTimestamp=1000000&toTimestamp=2000000&includeRemoves=TrUe");
assertSameJson("""
{
"pathId": "/document/v1",
@@ -246,20 +250,23 @@ public class DocumentV1ApiTest {
{
"id": "id:space:music::one",
"fields": {
- "artist": "Tom Waits",\s
- "embedding": { "type": "tensor(x[3])", "values": [1.0,2.0,3.0] }\s
+ "artist": "Tom Waits",
+ "embedding": { "type": "tensor(x[3])", "values": [1.0,2.0,3.0] }
}
},
{
"id": "id:space:music:n=1:two",
"fields": {
- "artist": "Asa-Chan & Jun-Ray",\s
- "embedding": { "type": "tensor(x[3])", "values": [4.0,5.0,6.0] }\s
+ "artist": "Asa-Chan & Jun-Ray",
+ "embedding": { "type": "tensor(x[3])", "values": [4.0,5.0,6.0] }
}
},
{
"id": "id:space:music:g=a:three",
"fields": {}
+ },
+ {
+ "remove": "id:space:music::t-square-truth"
}
],
"documentCount": 3,
@@ -290,6 +297,7 @@ public class DocumentV1ApiTest {
assertEquals(1, parameters.getSliceId());
assertEquals(0, parameters.getFromTimestamp()); // not set; 0 is default
assertEquals(0, parameters.getToTimestamp()); // not set; 0 is default
+ assertFalse(parameters.visitRemoves()); // false by default
// Put some documents in the response
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));