summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java75
1 files changed, 53 insertions, 22 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index e6d5ea48e8f..74266fe2a6e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -46,6 +46,7 @@ import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
import com.yahoo.jdisc.Metric;
@@ -178,6 +179,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String DRY_RUN = "dryRun";
private static final String FROM_TIMESTAMP = "fromTimestamp";
private static final String TO_TIMESTAMP = "toTimestamp";
+ private static final String INCLUDE_REMOVES = "includeRemoves";
private final Clock clock;
private final Duration visitTimeout;
@@ -760,8 +762,31 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
+ private interface DocumentWriter {
+ void write(ByteArrayOutputStream out) throws IOException;
+ }
+
/** Writes documents to an internal queue, which is flushed regularly. */
void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException {
+ writeDocument(myOut -> {
+ try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
+ new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document);
+ }
+ }, completionHandler);
+ }
+
+ void writeDocumentRemoval(DocumentId id, CompletionHandler completionHandler) throws IOException {
+ writeDocument(myOut -> {
+ try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
+ myJson.writeStartObject();
+ myJson.writeStringField("remove", id.toString());
+ myJson.writeEndObject();
+ }
+ }, completionHandler);
+ }
+
+ /** Writes documents to an internal queue, which is flushed regularly. */
+ void writeDocument(DocumentWriter documentWriter, CompletionHandler completionHandler) throws IOException {
if (completionHandler != null) {
acks.add(completionHandler);
ackDocuments();
@@ -771,9 +796,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early.
ByteArrayOutputStream myOut = new ByteArrayOutputStream(1);
myOut.write(','); // Prepend rather than append, to avoid double memory copying.
- try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
- new JsonWriter(myJson, tensorShortForm(), tensorDirectValues()).write(document);
- }
+ documentWriter.write(myOut);
docs.add(myOut);
// Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled.
@@ -1173,6 +1196,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(DocumentOnly.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
parameters.visitInconsistentBuckets(true);
+ getProperty(request, INCLUDE_REMOVES, booleanParser).ifPresent(parameters::setVisitRemoves);
if (streamed) {
StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
@@ -1247,8 +1271,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Called at the start of response rendering. */
default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { }
- /** Called for every document received from backend visitors—must call the ack for these to proceed. */
- default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
+ /** Called for every document or removal received from backend visitors—must call the ack for these to proceed. */
+ default void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) { }
/** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
default void onEnd(JsonResponse response) throws IOException { }
@@ -1276,7 +1300,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
ResponseHandler handler,
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
visit(request, parameters, false, fullyApplied, handler, new VisitCallback() {
- @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
outstanding.decrementAndGet();
@@ -1320,18 +1344,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeDocumentsArrayStart();
}
- @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ @Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) {
try {
- if (streamed)
- response.writeDocumentValue(document, new CompletionHandler() {
- @Override public void completed() { ack.run();}
+ if (streamed) {
+ CompletionHandler completion = new CompletionHandler() {
+ @Override public void completed() { ack.run(); }
@Override public void failed(Throwable t) {
ack.run();
onError.accept(t.getMessage());
}
- });
+ };
+ if (document != null) response.writeDocumentValue(document, completion);
+ else response.writeDocumentRemoval(removeId, completion);
+ }
else {
- response.writeDocumentValue(document, null);
+ if (document != null) response.writeDocumentValue(document, null);
+ else response.writeDocumentRemoval(removeId, null);
ack.run();
}
}
@@ -1410,16 +1438,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (parameters.getRemoteDataHandler() == null) {
parameters.setLocalDataHandler(new VisitorDataHandler() {
@Override public void onMessage(Message m, AckToken token) {
- if (m instanceof PutDocumentMessage)
- callback.onDocument(response,
- ((PutDocumentMessage) m).getDocumentPut().getDocument(),
- () -> ack(token),
- errorMessage -> {
- error.set(errorMessage);
- controller.abort();
- });
- else
- throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass());
+ Document document = null;
+ DocumentId removeId = null;
+ if (m instanceof PutDocumentMessage put) document = put.getDocumentPut().getDocument();
+ else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) removeId = remove.getDocumentId();
+ else throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName());
+ callback.onDocument(response,
+ document,
+ removeId,
+ () -> ack(token),
+ errorMessage -> {
+ error.set(errorMessage);
+ controller.abort();
+ });
}
});
}