diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-20 16:25:26 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-20 16:25:26 +0200 |
commit | 85b973c8d7614070b724bee57e22439682983bbe (patch) | |
tree | 201a0e9c552646b9855cce707dd479fee3866972 /vespaclient-container-plugin | |
parent | 6ad5a779f95141d0b6de1597bcb4cde6f91814c6 (diff) |
Write visited documents from a dedicated executor to avoid blocking mbus
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 68 insertions, 49 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 865aed221c5..77e84cffd25 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -177,6 +177,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); + private final ScheduledExecutorService visitRenderer; private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject @@ -188,12 +189,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, - documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig); + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, + Math.max(2, Runtime.getRuntime().availableProcessors() / 4)); } DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, - ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) { + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, int visitorRendererThreads) { this.clock = clock; this.handlerTimeout = handlerTimeout; this.parser = new DocumentOperationParser(documentmanagerConfig); @@ -212,6 +214,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); + visitRenderer = Executors.newScheduledThreadPool(visitorRendererThreads, new DaemonThreadFactory("document-api-handler-renderer-")); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -271,6 +274,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. dispatcher.shutdown(); visitDispatcher.shutdown(); + visitRenderer.shutdown(); while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { dispatchEnqueued(); dispatchVisitEnqueued(); @@ -291,6 +295,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) visitDispatcher.shutdownNow(); + + if ( ! visitRenderer.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + visitRenderer.shutdownNow(); } catch (InterruptedException e) { log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); @@ -1025,10 +1032,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Called at the start of response rendering. */ default void onStart(JsonResponse response) throws IOException { } - /** Called for every document received from backend visitors — must call the ack for these to proceed. */ + /** Called for every document received from backend visitors—must call the ack for these to proceed. */ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } - /** Called at the end of response rendering, before generic status data is written. */ + /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */ default void onEnd(JsonResponse response) throws IOException { } } @@ -1091,6 +1098,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { visit(request, parameters, streaming, handler, new VisitCallback() { + AtomicLong acks = new AtomicLong(); @Override public void onStart(JsonResponse response) throws IOException { if (streaming) response.commit(Response.Status.OK); @@ -1098,10 +1106,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - response.writeDocumentValue(document); - ack.run(); + acks.incrementAndGet(); + visitRenderer.execute(() -> { + response.writeDocumentValue(document); + ack.run(); + acks.decrementAndGet(); + }); } @Override public void onEnd(JsonResponse response) throws IOException { + try { + while (acks.get() > 0) + Thread.sleep(1); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } response.writeArrayEnd(); } }); @@ -1113,52 +1132,52 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { try { - JsonResponse response = JsonResponse.create(request, handler); + JsonResponse response = JsonResponse.create(request, handler, streaming); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { - super.onDone(code, message); - loggingException(() -> { - try (response) { - callback.onEnd(response); - - if (getVisitorStatistics() != null) - response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - - int status = Response.Status.BAD_GATEWAY; - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { - response.writeMessage("No buckets visited within timeout of " + - parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); - status = Response.Status.GATEWAY_TIMEOUT; - break; - } - // TODO jonmv: [test] limit pending, - // TODO jonmv: [test] abort on shutdown, - // TODO jonmv: always supply and document continuation? - case SUCCESS: // Intentional fallthrough. - case ABORTED: // Intentional fallthrough. - if (error.get() == null) { - ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); - if (progress != null && ! progress.isFinished()) - response.writeContinuation(progress.serializeToString()); - - status = Response.Status.OK; - break; - } - default: - response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); + visitRenderer.execute(() -> { + super.onDone(code, message); + loggingException(() -> { + try (response) { + callback.onEnd(response); + + if (getVisitorStatistics() != null) + response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); + + int status = Response.Status.BAD_GATEWAY; + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { + response.writeMessage("No buckets visited within timeout of " + + parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); + status = Response.Status.GATEWAY_TIMEOUT; + break; + } + // TODO jonmv: always supply and document continuation? + case SUCCESS: // Intentional fallthrough. + case ABORTED: // Intentional fallthrough. + if (error.get() == null) { + ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); + if (progress != null && ! progress.isFinished()) + response.writeContinuation(progress.serializeToString()); + + status = Response.Status.OK; + break; + } + default: + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); + } + if ( ! streaming) + response.commit(status); } - if ( ! streaming) - response.commit(status); - } - }); - visitDispatcher.execute(() -> { - phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. - visits.remove(this).destroy(); + }); + visitDispatcher.execute(() -> { + phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. + visits.remove(this).destroy(); + }); }); } }; diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 829a9bcab9f..1f2eb8f83ae 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -136,7 +136,7 @@ public class DocumentV1ApiTest { access = new MockDocumentAccess(docConfig); metric = new NullMetric(); metrics = new MetricReceiver.MockReceiver(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1); } @After @@ -752,7 +752,7 @@ public class DocumentV1ApiTest { @Test public void testThroughput() throws InterruptedException { DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1); int writers = 4; int queueFill = executorConfig.maxThrottled() - writers; @@ -803,7 +803,7 @@ public class DocumentV1ApiTest { replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS); return new Result(0); }); - // Send the rest of the documents. Rely on resender to empty queue of throttled oppperations. + // Send the rest of the documents. Rely on resender to empty queue of throttled operations. for (int i = queueFill; i < docs; i++) { int j = i; writer.execute(() -> { |