diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-21 17:48:19 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-21 17:48:19 +0200 |
commit | 9e2cbe370e498fe0a89c8abae2a8b23344cf3f6a (patch) | |
tree | faf8464e04acf1a943a2b751f3745a2c74c1dc29 /vespaclient-container-plugin/src | |
parent | 56c3fc7c2a3b7e317e79593aa56ed2d03472cbde (diff) |
Revert "Merge pull request #19686 from vespa-engine/jonmv/revert-streamed-visits"
This reverts commit 56c3fc7c2a3b7e317e79593aa56ed2d03472cbde, reversing
changes made to 367dae08c390833a54c1bae11282df5a7e056d16.
Diffstat (limited to 'vespaclient-container-plugin/src')
2 files changed, 170 insertions, 58 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index aa96b0932c3..a3e3c512dcd 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -8,7 +8,9 @@ import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; +import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.jdisc.ContentChannelOutputStream; +import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; @@ -86,10 +88,12 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -158,6 +162,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String TIME_CHUNK = "timeChunk"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; + private static final String STREAM = "stream"; private final Clock clock; private final Duration handlerTimeout; @@ -175,10 +180,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); + private final Executor defaultExecutor; private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject - public DocumentV1ApiHandler(Metric metric, + public DocumentV1ApiHandler(ContainerThreadPool threadPool, + Metric metric, MetricReceiver metricReceiver, VespaDocumentAccess documentAccess, DocumentmanagerConfig documentManagerConfig, @@ -186,12 +193,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, - documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig); + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor()); } DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, - ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) { + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) { this.clock = clock; this.handlerTimeout = handlerTimeout; this.parser = new DocumentOperationParser(documentmanagerConfig); @@ -210,6 +217,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); + this.defaultExecutor = defaultExecutor; } // ------------------------------------------------ Requests ------------------------------------------------- @@ -355,9 +363,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - VisitorParameters parameters = parseGetParameters(request, path); + boolean streaming = getProperty(request, STREAM, booleanParser).orElse(false); + VisitorParameters parameters = parseGetParameters(request, path, streaming); return () -> { - visitAndWrite(request, parameters, handler); + visitAndWrite(request, parameters, handler, streaming); return true; // VisitorSession has its own throttle handling. }; }); @@ -573,19 +582,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static class JsonResponse implements AutoCloseable { private final BufferedContentChannel buffer = new BufferedContentChannel(); - private final OutputStream out = new ContentChannelOutputStream(buffer); - private final JsonGenerator json = jsonFactory.createGenerator(out); + private final OutputStream out; + private final JsonGenerator json; private final ResponseHandler handler; private ContentChannel channel; - private JsonResponse(ResponseHandler handler) throws IOException { + private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException { this.handler = handler; + out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24) + : new ContentChannelOutputStream(buffer); + json = jsonFactory.createGenerator(out); json.writeStartObject(); } /** Creates a new JsonResponse with path and id fields written. */ static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler); + JsonResponse response = new JsonResponse(handler, false); response.writePathId(path.rawPath()); response.writeDocId(path.id()); return response; @@ -593,15 +605,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Creates a new JsonResponse with path field written. */ static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler); + return create(request, handler, false); + } + + /** Creates a new JsonResponse with path field written. */ + static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException { + JsonResponse response = new JsonResponse(handler, streaming); response.writePathId(request.getUri().getRawPath()); return response; } /** Creates a new JsonResponse with path and message fields written. */ static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler); - response.writePathId(request.getUri().getRawPath()); + JsonResponse response = create(request, handler); response.writeMessage(message); return response; } @@ -955,8 +971,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) { - int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1)); + private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) { + int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10, + getProperty(request, WANTED_DOCUMENT_COUNT, integerParser) + .orElse(streaming ? Integer.MAX_VALUE : 1)); if (wantedDocumentCount <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); @@ -1015,10 +1033,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Called at the start of response rendering. */ default void onStart(JsonResponse response) throws IOException { } - /** Called for every document received from backend visitors — must call the ack for these to proceed. */ + /** Called for every document received from backend visitors—must call the ack for these to proceed. */ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } - /** Called at the end of response rendering, before generic status data is written. */ + /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */ default void onEnd(JsonResponse response) throws IOException { } } @@ -1042,7 +1060,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { - visit(request, parameters, handler, new VisitCallback() { + visit(request, parameters, false, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { @@ -1079,66 +1097,92 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, handler, new VisitCallback() { + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { + visit(request, parameters, streaming, handler, new VisitCallback() { + final Deque<Runnable> writes = new ConcurrentLinkedDeque<>(); + final AtomicBoolean writing = new AtomicBoolean(); @Override public void onStart(JsonResponse response) throws IOException { + if (streaming) + response.commit(Response.Status.OK); + response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - response.writeDocumentValue(document); - ack.run(); + writes.add(() -> { + response.writeDocumentValue(document); + ack.run(); + }); + if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing. + defaultExecutor.execute(() -> { + for (Runnable write; (write = writes.poll()) != null; write.run()); + writing.set(false); + }); } @Override public void onEnd(JsonResponse response) throws IOException { + // Wait for other writers to complete, then write what remains here. + while ( ! writing.compareAndSet(false, true)) { + try { + Thread.sleep(1); + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen"); + Thread.currentThread().interrupt(); + } + } + for (Runnable write; (write = writes.poll()) != null; write.run()); response.writeArrayEnd(); } }); } private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, handler, new VisitCallback() { }); + visit(request, parameters, false, handler, new VisitCallback() { }); } - private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { + private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { try { - JsonResponse response = JsonResponse.create(request, handler); + JsonResponse response = JsonResponse.create(request, handler, streaming); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { - super.onDone(code, message); - loggingException(() -> { - callback.onEnd(response); - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { - response.writeMessage("No buckets visited within timeout of " + - parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); - response.respond(Response.Status.GATEWAY_TIMEOUT); - break; - } - case SUCCESS: // Intentional fallthrough. - case ABORTED: // Intentional fallthrough. - if (error.get() == null) { - ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); - if (progress != null && ! progress.isFinished()) - response.writeContinuation(progress.serializeToString()); + defaultExecutor.execute(() -> { + super.onDone(code, message); + loggingException(() -> { + try (response) { + callback.onEnd(response); - if (getVisitorStatistics() != null) - response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - - response.respond(Response.Status.OK); - break; - } - default: - response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); if (getVisitorStatistics() != null) response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - response.respond(Response.Status.BAD_GATEWAY); - } - }); - visitDispatcher.execute(() -> { + int status = Response.Status.BAD_GATEWAY; + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { + response.writeMessage("No buckets visited within timeout of " + + parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); + status = Response.Status.GATEWAY_TIMEOUT; + break; + } + // TODO jonmv: always supply and document continuation? + case SUCCESS: // Intentional fallthrough. + case ABORTED: // Intentional fallthrough. + if (error.get() == null) { + ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); + if (progress != null && ! progress.isFinished()) + response.writeContinuation(progress.serializeToString()); + + status = Response.Status.OK; + break; + } + default: + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); + } + if ( ! streaming) + response.commit(status); + } + }); phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 96700f08823..b23533a720e 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -4,6 +4,7 @@ package com.yahoo.document.restapi.resource; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.container.jdisc.RequestHandlerTestDriver; import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.document.BucketId; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; @@ -37,6 +38,7 @@ import com.yahoo.documentapi.UpdateResponse; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorDestinationParameters; import com.yahoo.documentapi.VisitorDestinationSession; +import com.yahoo.documentapi.VisitorIterator; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorResponse; import com.yahoo.documentapi.VisitorSession; @@ -134,7 +136,8 @@ public class DocumentV1ApiTest { access = new MockDocumentAccess(docConfig); metric = new NullMetric(); metrics = new MetricReceiver.MockReceiver(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, + executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2)); } @After @@ -245,19 +248,83 @@ public class DocumentV1ApiTest { "}", response.readAll()); assertEquals(200, response.getStatus()); + // GET at root is a visit. Streaming mode can be specified with &stream=true + access.expect(tokens); + access.expect(parameters -> { + assertEquals("content", parameters.getRoute().toString()); + assertEquals("default", parameters.getBucketSpace()); + assertEquals(1025, parameters.getMaxTotalHits()); // Not bounded likewise for streamed responses. + assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); + assertEquals("[id]", parameters.getFieldSet()); + assertEquals("(all the things)", parameters.getDocumentSelection()); + assertEquals(6000, parameters.getSessionTimeoutMs()); + // Put some documents in the response + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2)); + VisitorStatistics statistics = new VisitorStatistics(); + statistics.setBucketsVisited(1); + statistics.setDocumentsVisited(3); + parameters.getControlHandler().onVisitorStatistics(statistics); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK"); + }); + response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + + "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true"); + assertSameJson("{" + + " \"pathId\": \"/document/v1\"," + + " \"documents\": [" + + " {" + + " \"id\": \"id:space:music::one\"," + + " \"fields\": {" + + " \"artist\": \"Tom Waits\"" + + " }" + + " }," + + " {" + + " \"id\": \"id:space:music:n=1:two\"," + + " \"fields\": {" + + " \"artist\": \"Asa-Chan & Jun-Ray\"" + + " }" + + " }," + + " {" + + " \"id\": \"id:space:music:g=a:three\"," + + " \"fields\": {}" + + " }" + + " ]," + + " \"documentCount\": 3" + + "}", response.readAll()); + assertEquals(200, response.getStatus()); + // GET with namespace and document type is a restricted visit. + ProgressToken progress = new ProgressToken(); + VisitorIterator.createFromExplicitBucketSet(Set.of(new BucketId(1), new BucketId(2)), 8, progress) + .update(new BucketId(1), new BucketId(1)); access.expect(parameters -> { assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection()); - assertEquals(new ProgressToken().serializeToString(), parameters.getResumeToken().serializeToString()); + assertEquals(progress.serializeToString(), parameters.getResumeToken().serializeToString()); throw new IllegalArgumentException("parse failure"); }); - response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString()); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + progress.serializeToString()); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/docid\"," + " \"message\": \"parse failure\"" + "}", response.readAll()); assertEquals(400, response.getStatus()); + // GET when a streamed visit returns status code 200 also when errors occur. + access.expect(parameters -> { + assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection()); + parameters.getControlHandler().onProgress(progress); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "failure?"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?stream=true"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"documents\": []," + + //" \"continuation\": \"" + progress.serializeToString() + "\"," + + " \"message\": \"failure?\"" + + "}", response.readAll()); + assertEquals(200, response.getStatus()); + // POST with namespace and document type is a restricted visit with a required destination cluster ("destinationCluster") access.expect(parameters -> { fail("Not supposed to run"); @@ -686,7 +753,8 @@ public class DocumentV1ApiTest { @Test public void testThroughput() throws InterruptedException { DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, + executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2)); int writers = 4; int queueFill = executorConfig.maxThrottled() - writers; @@ -737,7 +805,7 @@ public class DocumentV1ApiTest { replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS); return new Result(0); }); - // Send the rest of the documents. Rely on resender to empty queue of throttled oppperations. + // Send the rest of the documents. Rely on resender to empty queue of throttled operations. for (int i = queueFill; i < docs; i++) { int j = i; writer.execute(() -> { |