diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-21 13:58:55 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-21 13:58:55 +0200 |
commit | 28e3d42feebc04bc5385ab854ce6744d2a03b5e1 (patch) | |
tree | a97b9bc490873194afea3ed9d19c00b1d5a64c4c /vespaclient-container-plugin/src | |
parent | 2f39f64ddba3dfe6f0b9dd92d6284f4770003053 (diff) |
Use the default threadpool executor, with minimal blocking
Diffstat (limited to 'vespaclient-container-plugin/src')
2 files changed, 32 insertions, 29 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 711b6388b7a..23bfde986cf 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -8,6 +8,7 @@ import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; +import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream; import com.yahoo.document.Document; @@ -87,6 +88,7 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; @@ -178,11 +180,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); - private final ScheduledExecutorService visitRenderer; + private final Executor defaultExecutor; private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject - public DocumentV1ApiHandler(Metric metric, + public DocumentV1ApiHandler(ContainerThreadPool threadPool, + Metric metric, MetricReceiver metricReceiver, VespaDocumentAccess documentAccess, DocumentmanagerConfig documentManagerConfig, @@ -190,13 +193,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, - documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, - Math.max(2, Runtime.getRuntime().availableProcessors() / 4)); + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor()); } DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, - ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, int visitorRendererThreads) { + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) { this.clock = clock; this.handlerTimeout = handlerTimeout; this.parser = new DocumentOperationParser(documentmanagerConfig); @@ -215,7 +217,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); - visitRenderer = Executors.newScheduledThreadPool(visitorRendererThreads, new DaemonThreadFactory("document-api-handler-renderer-")); + this.defaultExecutor = defaultExecutor; } // ------------------------------------------------ Requests ------------------------------------------------- @@ -275,7 +277,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. dispatcher.shutdown(); visitDispatcher.shutdown(); - visitRenderer.shutdown(); while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { dispatchEnqueued(); dispatchVisitEnqueued(); @@ -296,9 +297,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) visitDispatcher.shutdownNow(); - - if ( ! visitRenderer.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) - visitRenderer.shutdownNow(); } catch (InterruptedException e) { log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); @@ -1099,34 +1097,39 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private void renderDocuments(Deque<Runnable> writes, JsonResponse response, AtomicBoolean done) { - synchronized (response) { - for (Runnable write; (write = writes.poll()) != null; write.run()) ; - if ( ! done.get()) - visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS); - } - } - private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { visit(request, parameters, streaming, handler, new VisitCallback() { final Deque<Runnable> writes = new ConcurrentLinkedDeque<>(); - final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean writing = new AtomicBoolean(); @Override public void onStart(JsonResponse response) throws IOException { if (streaming) response.commit(Response.Status.OK); response.writeDocumentsArrayStart(); - visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { writes.add(() -> { response.writeDocumentValue(document); ack.run(); }); + if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing. + defaultExecutor.execute(() -> { + for (Runnable write; (write = writes.poll()) != null; write.run()); + writing.set(false); + }); } @Override public void onEnd(JsonResponse response) throws IOException { - done.set(true); - renderDocuments(writes, response, done); + // Wait for other writers to complete, then write what remains here. + while ( ! writing.compareAndSet(false, true)) { + try { + Thread.sleep(1); + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen"); + Thread.currentThread().interrupt(); + } + } + for (Runnable write; (write = writes.poll()) != null; write.run()); response.writeArrayEnd(); } }); @@ -1144,7 +1147,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { - visitRenderer.execute(() -> { + defaultExecutor.execute(() -> { super.onDone(code, message); loggingException(() -> { try (response) { @@ -1180,10 +1183,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.commit(status); } }); - visitDispatcher.execute(() -> { - phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. - visits.remove(this).destroy(); - }); + phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. + visits.remove(this).destroy(); }); } }; diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 51f43e521b9..9234f1cfa6f 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -136,7 +136,8 @@ public class DocumentV1ApiTest { access = new MockDocumentAccess(docConfig); metric = new NullMetric(); metrics = new MetricReceiver.MockReceiver(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, + executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2)); } @After @@ -752,7 +753,8 @@ public class DocumentV1ApiTest { @Test public void testThroughput() throws InterruptedException { DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, + executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2)); int writers = 4; int queueFill = executorConfig.maxThrottled() - writers; |