diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-02-02 14:05:28 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-02-03 09:54:15 +0100 |
commit | ad290b1171641a1c2a6ea8f16c3995deefb77ed6 (patch) | |
tree | 3498c785b78d5838c264475ffd3686238f23ea16 /vespaclient-container-plugin | |
parent | 813afe71808808ef983003be90c6e8bcc5c2ac0a (diff) |
Refactor to allow more flex in reacting to documents received
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 88 insertions, 30 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 7330d21d830..48e5f37a4f8 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,23 +20,26 @@ import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; import com.yahoo.document.json.JsonWriter; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AckToken; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorDataHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; import com.yahoo.jdisc.Metric; @@ -51,6 +54,7 @@ import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.messagebus.Message; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.TraceNode; @@ -85,6 +89,7 @@ import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; @@ -304,7 +309,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { enqueueAndDispatch(request, handler, () -> { VisitorParameters parameters = parseParameters(request, path); return () -> { - visit(request, parameters, handler); + visitAndWrite(request, parameters, handler); return true; // VisitorSession has its own throttle handling. }; }); @@ -830,24 +835,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return parameters; } - private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + private interface VisitCallback { + /** Called at the start of response rendering. */ + default void onStart(JsonResponse response) throws IOException { } + + /** Called for every document received from backend visitors — must call the ack for these to proceed. */ + default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } + + /** Called at the end of response rendering, before generic status data is written. */ + default void onEnd(JsonResponse response) throws IOException { } + } + + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { + @Override public void onStart(JsonResponse response) throws IOException { + response.writeDocumentsArrayStart(); + } + @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + response.writeDocumentValue(document); + ack.run(); + } + @Override public void onEnd(JsonResponse response) throws IOException { + response.writeArrayEnd(); + } + }); + } + + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); - response.writeDocumentsArrayStart(); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. - parameters.setLocalDataHandler(new DumpVisitorDataHandler() { - @Override public void onDocument(Document doc, long timeStamp) { - loggingException(() -> { - response.writeDocumentValue(doc); - }); - } - @Override public void onRemove(DocumentId id) { } // We don't visit removes. - }); - parameters.setControlHandler(new VisitorControlHandler() { + AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. + callback.onStart(response); + VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { - response.writeArrayEnd(); // Close "documents" array. + callback.onEnd(response); switch (code) { case TIMEOUT: if ( ! hasVisitedAnyBuckets()) { @@ -858,13 +882,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } case SUCCESS: // Intentional fallthrough. case ABORTED: // Intentional fallthrough. - if (getProgress() != null && ! getProgress().isFinished()) - response.writeContinuation(getProgress().serializeToString()); + if (error.get() == null) { + if (getProgress() != null && ! getProgress().isFinished()) + response.writeContinuation(getProgress().serializeToString()); - response.respond(Response.Status.OK); - break; + response.respond(Response.Status.OK); + break; + } default: - response.writeMessage(message != null ? message : "Visiting failed"); + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); response.respond(Response.Status.INTERNAL_SERVER_ERROR); } dispatcher.execute(() -> { @@ -873,8 +899,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); }); } - }); - visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); + }; + if (parameters.getRemoteDataHandler() == null) { + parameters.setLocalDataHandler(new VisitorDataHandler() { + @Override public void onMessage(Message m, AckToken token) { + if (m instanceof PutDocumentMessage) + callback.onDocument(response, + ((PutDocumentMessage) m).getDocumentPut().getDocument(), + () -> ack(token), + errorMessage -> { + error.set(errorMessage); + controller.abort(); + }); + else + throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass()); + } + }); + } + parameters.setControlHandler(controller); + visits.put(controller, access.createVisitorSession(parameters)); phaser.arriveAndDeregister(); } catch (ParseException e) { diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 449daa4970a..a2fbf3322fe 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -25,7 +25,6 @@ import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.DocumentIdResponse; import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Response; import com.yahoo.documentapi.Result; @@ -40,6 +39,7 @@ import com.yahoo.documentapi.VisitorDestinationSession; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorResponse; import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; @@ -59,11 +59,13 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; +import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -174,6 +176,7 @@ public class DocumentV1ApiTest { @Test public void testResponses() throws ExecutionException, InterruptedException { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null)); // GET at non-existent path returns 404 with available paths var response = driver.sendRequest("http://localhost/document/v1/not-found"); assertSameJson("{" + @@ -191,6 +194,7 @@ public class DocumentV1ApiTest { assertEquals(404, response.getStatus()); // GET at root is a visit. Numeric parameters have an upper bound. + access.expect(tokens); access.expect(parameters -> { assertEquals("[Content:cluster=content]", parameters.getRoute().toString()); assertEquals("default", parameters.getBucketSpace()); @@ -200,9 +204,9 @@ public class DocumentV1ApiTest { assertEquals("(all the things)", parameters.getDocumentSelection()); assertEquals(1000, parameters.getSessionTimeoutMs()); // Put some documents in the response - ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc1, 0); - ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc2, 0); - ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc3, 0); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2)); VisitorStatistics statistics = new VisitorStatistics(); statistics.setBucketsVisited(1); parameters.getControlHandler().onVisitorStatistics(statistics); @@ -542,6 +546,7 @@ public class DocumentV1ApiTest { static class MockDocumentAccess extends DocumentAccess { private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>(); + private final Set<AckToken> outstanding = new CopyOnWriteArraySet<>(); private final MockAsyncSession session = new MockAsyncSession(); MockDocumentAccess(DocumentmanagerConfig config) { @@ -560,18 +565,24 @@ public class DocumentV1ApiTest { @Override public VisitorSession createVisitorSession(VisitorParameters parameters) { - expectations.get().accept(parameters); - return new VisitorSession() { + VisitorSession visitorSession = new VisitorSession() { + { + parameters.getControlHandler().setSession(this); + if (parameters.getLocalDataHandler() != null) + parameters.getLocalDataHandler().setSession(this); + } @Override public boolean isDone() { return false; } @Override public ProgressToken getProgress() { return null; } @Override public Trace getTrace() { return null; } @Override public boolean waitUntilDone(long timeoutMs) { return false; } - @Override public void ack(AckToken token) { } + @Override public void ack(AckToken token) { assertTrue(outstanding.remove(token)); } @Override public void abort() { } @Override public VisitorResponse getNext() { return null; } @Override public VisitorResponse getNext(int timeoutMilliseconds) { return null; } - @Override public void destroy() { } + @Override public void destroy() { assertEquals(Set.of(), outstanding); } }; + expectations.get().accept(parameters); + return visitorSession; } @Override @@ -593,6 +604,10 @@ public class DocumentV1ApiTest { this.expectations.set(expectations); } + public void expect(Collection<AckToken> tokens) { + outstanding.addAll(tokens); + } + } |