summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-21 13:58:55 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-21 13:58:55 +0200
commit28e3d42feebc04bc5385ab854ce6744d2a03b5e1 (patch)
treea97b9bc490873194afea3ed9d19c00b1d5a64c4c /vespaclient-container-plugin
parent2f39f64ddba3dfe6f0b9dd92d6284f4770003053 (diff)
Use the default threadpool executor, with minimal blocking
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java55
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java6
2 files changed, 32 insertions, 29 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 711b6388b7a..23bfde986cf 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -8,6 +8,7 @@ import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream;
import com.yahoo.document.Document;
@@ -87,6 +88,7 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
@@ -178,11 +180,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
- private final ScheduledExecutorService visitRenderer;
+ private final Executor defaultExecutor;
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
- public DocumentV1ApiHandler(Metric metric,
+ public DocumentV1ApiHandler(ContainerThreadPool threadPool,
+ Metric metric,
MetricReceiver metricReceiver,
VespaDocumentAccess documentAccess,
DocumentmanagerConfig documentManagerConfig,
@@ -190,13 +193,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
- documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig,
- Math.max(2, Runtime.getRuntime().availableProcessors() / 4));
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor());
}
DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
- ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, int visitorRendererThreads) {
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) {
this.clock = clock;
this.handlerTimeout = handlerTimeout;
this.parser = new DocumentOperationParser(documentmanagerConfig);
@@ -215,7 +217,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
TimeUnit.MILLISECONDS);
- visitRenderer = Executors.newScheduledThreadPool(visitorRendererThreads, new DaemonThreadFactory("document-api-handler-renderer-"));
+ this.defaultExecutor = defaultExecutor;
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -275,7 +277,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
dispatcher.shutdown();
visitDispatcher.shutdown();
- visitRenderer.shutdown();
while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
dispatchEnqueued();
dispatchVisitEnqueued();
@@ -296,9 +297,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
visitDispatcher.shutdownNow();
-
- if ( ! visitRenderer.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
- visitRenderer.shutdownNow();
}
catch (InterruptedException e) {
log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down");
@@ -1099,34 +1097,39 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private void renderDocuments(Deque<Runnable> writes, JsonResponse response, AtomicBoolean done) {
- synchronized (response) {
- for (Runnable write; (write = writes.poll()) != null; write.run()) ;
- if ( ! done.get())
- visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS);
- }
- }
-
private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
visit(request, parameters, streaming, handler, new VisitCallback() {
final Deque<Runnable> writes = new ConcurrentLinkedDeque<>();
- final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean writing = new AtomicBoolean();
@Override public void onStart(JsonResponse response) throws IOException {
if (streaming)
response.commit(Response.Status.OK);
response.writeDocumentsArrayStart();
- visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS);
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
writes.add(() -> {
response.writeDocumentValue(document);
ack.run();
});
+ if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing.
+ defaultExecutor.execute(() -> {
+ for (Runnable write; (write = writes.poll()) != null; write.run());
+ writing.set(false);
+ });
}
@Override public void onEnd(JsonResponse response) throws IOException {
- done.set(true);
- renderDocuments(writes, response, done);
+ // Wait for other writers to complete, then write what remains here.
+ while ( ! writing.compareAndSet(false, true)) {
+ try {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e) {
+ log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen");
+ Thread.currentThread().interrupt();
+ }
+ }
+ for (Runnable write; (write = writes.poll()) != null; write.run());
response.writeArrayEnd();
}
});
@@ -1144,7 +1147,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
- visitRenderer.execute(() -> {
+ defaultExecutor.execute(() -> {
super.onDone(code, message);
loggingException(() -> {
try (response) {
@@ -1180,10 +1183,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.commit(status);
}
});
- visitDispatcher.execute(() -> {
- phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
- visits.remove(this).destroy();
- });
+ phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
+ visits.remove(this).destroy();
});
}
};
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 51f43e521b9..9234f1cfa6f 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -136,7 +136,8 @@ public class DocumentV1ApiTest {
access = new MockDocumentAccess(docConfig);
metric = new NullMetric();
metrics = new MetricReceiver.MockReceiver();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
+ executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2));
}
@After
@@ -752,7 +753,8 @@ public class DocumentV1ApiTest {
@Test
public void testThroughput() throws InterruptedException {
DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
+ executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2));
int writers = 4;
int queueFill = executorConfig.maxThrottled() - writers;