aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-02-02 14:05:28 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-02-03 09:54:15 +0100
commitad290b1171641a1c2a6ea8f16c3995deefb77ed6 (patch)
tree3498c785b78d5838c264475ffd3686238f23ea16 /vespaclient-container-plugin
parent813afe71808808ef983003be90c6e8bcc5c2ac0a (diff)
Refactor to allow more flex in reacting to documents received
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java85
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java33
2 files changed, 88 insertions, 30 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 7330d21d830..48e5f37a4f8 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -20,23 +20,26 @@ import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.idstring.IdIdString;
import com.yahoo.document.json.DocumentOperationType;
import com.yahoo.document.json.JsonReader;
import com.yahoo.document.json.JsonWriter;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorDataHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
import com.yahoo.jdisc.Metric;
@@ -51,6 +54,7 @@ import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.TraceNode;
@@ -85,6 +89,7 @@ import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
@@ -304,7 +309,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
enqueueAndDispatch(request, handler, () -> {
VisitorParameters parameters = parseParameters(request, path);
return () -> {
- visit(request, parameters, handler);
+ visitAndWrite(request, parameters, handler);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -830,24 +835,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return parameters;
}
- private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ private interface VisitCallback {
+ /** Called at the start of response rendering. */
+ default void onStart(JsonResponse response) throws IOException { }
+
+ /** Called for every document received from backend visitors — must call the ack for these to proceed. */
+ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
+
+ /** Called at the end of response rendering, before generic status data is written. */
+ default void onEnd(JsonResponse response) throws IOException { }
+ }
+
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ visit(request, parameters, handler, new VisitCallback() {
+ @Override public void onStart(JsonResponse response) throws IOException {
+ response.writeDocumentsArrayStart();
+ }
+ @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ response.writeDocumentValue(document);
+ ack.run();
+ }
+ @Override public void onEnd(JsonResponse response) throws IOException {
+ response.writeArrayEnd();
+ }
+ });
+ }
+
+ private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
try {
JsonResponse response = JsonResponse.create(request, handler);
- response.writeDocumentsArrayStart();
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) {
- loggingException(() -> {
- response.writeDocumentValue(doc);
- });
- }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
+ AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
+ callback.onStart(response);
+ VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
- response.writeArrayEnd(); // Close "documents" array.
+ callback.onEnd(response);
switch (code) {
case TIMEOUT:
if ( ! hasVisitedAnyBuckets()) {
@@ -858,13 +882,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
case SUCCESS: // Intentional fallthrough.
case ABORTED: // Intentional fallthrough.
- if (getProgress() != null && ! getProgress().isFinished())
- response.writeContinuation(getProgress().serializeToString());
+ if (error.get() == null) {
+ if (getProgress() != null && ! getProgress().isFinished())
+ response.writeContinuation(getProgress().serializeToString());
- response.respond(Response.Status.OK);
- break;
+ response.respond(Response.Status.OK);
+ break;
+ }
default:
- response.writeMessage(message != null ? message : "Visiting failed");
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
dispatcher.execute(() -> {
@@ -873,8 +899,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
});
}
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ };
+ if (parameters.getRemoteDataHandler() == null) {
+ parameters.setLocalDataHandler(new VisitorDataHandler() {
+ @Override public void onMessage(Message m, AckToken token) {
+ if (m instanceof PutDocumentMessage)
+ callback.onDocument(response,
+ ((PutDocumentMessage) m).getDocumentPut().getDocument(),
+ () -> ack(token),
+ errorMessage -> {
+ error.set(errorMessage);
+ controller.abort();
+ });
+ else
+ throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass());
+ }
+ });
+ }
+ parameters.setControlHandler(controller);
+ visits.put(controller, access.createVisitorSession(parameters));
phaser.arriveAndDeregister();
}
catch (ParseException e) {
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 449daa4970a..a2fbf3322fe 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -25,7 +25,6 @@ import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.DocumentIdResponse;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Response;
import com.yahoo.documentapi.Result;
@@ -40,6 +39,7 @@ import com.yahoo.documentapi.VisitorDestinationSession;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
@@ -59,11 +59,13 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.OptionalInt;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -174,6 +176,7 @@ public class DocumentV1ApiTest {
@Test
public void testResponses() throws ExecutionException, InterruptedException {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null));
// GET at non-existent path returns 404 with available paths
var response = driver.sendRequest("http://localhost/document/v1/not-found");
assertSameJson("{" +
@@ -191,6 +194,7 @@ public class DocumentV1ApiTest {
assertEquals(404, response.getStatus());
// GET at root is a visit. Numeric parameters have an upper bound.
+ access.expect(tokens);
access.expect(parameters -> {
assertEquals("[Content:cluster=content]", parameters.getRoute().toString());
assertEquals("default", parameters.getBucketSpace());
@@ -200,9 +204,9 @@ public class DocumentV1ApiTest {
assertEquals("(all the things)", parameters.getDocumentSelection());
assertEquals(1000, parameters.getSessionTimeoutMs());
// Put some documents in the response
- ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc1, 0);
- ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc2, 0);
- ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc3, 0);
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2));
VisitorStatistics statistics = new VisitorStatistics();
statistics.setBucketsVisited(1);
parameters.getControlHandler().onVisitorStatistics(statistics);
@@ -542,6 +546,7 @@ public class DocumentV1ApiTest {
static class MockDocumentAccess extends DocumentAccess {
private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
+ private final Set<AckToken> outstanding = new CopyOnWriteArraySet<>();
private final MockAsyncSession session = new MockAsyncSession();
MockDocumentAccess(DocumentmanagerConfig config) {
@@ -560,18 +565,24 @@ public class DocumentV1ApiTest {
@Override
public VisitorSession createVisitorSession(VisitorParameters parameters) {
- expectations.get().accept(parameters);
- return new VisitorSession() {
+ VisitorSession visitorSession = new VisitorSession() {
+ {
+ parameters.getControlHandler().setSession(this);
+ if (parameters.getLocalDataHandler() != null)
+ parameters.getLocalDataHandler().setSession(this);
+ }
@Override public boolean isDone() { return false; }
@Override public ProgressToken getProgress() { return null; }
@Override public Trace getTrace() { return null; }
@Override public boolean waitUntilDone(long timeoutMs) { return false; }
- @Override public void ack(AckToken token) { }
+ @Override public void ack(AckToken token) { assertTrue(outstanding.remove(token)); }
@Override public void abort() { }
@Override public VisitorResponse getNext() { return null; }
@Override public VisitorResponse getNext(int timeoutMilliseconds) { return null; }
- @Override public void destroy() { }
+ @Override public void destroy() { assertEquals(Set.of(), outstanding); }
};
+ expectations.get().accept(parameters);
+ return visitorSession;
}
@Override
@@ -593,6 +604,10 @@ public class DocumentV1ApiTest {
this.expectations.set(expectations);
}
+ public void expect(Collection<AckToken> tokens) {
+ outstanding.addAll(tokens);
+ }
+
}