aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-20 16:25:26 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-20 16:25:26 +0200
commit85b973c8d7614070b724bee57e22439682983bbe (patch)
tree201a0e9c552646b9855cce707dd479fee3866972 /vespaclient-container-plugin
parent6ad5a779f95141d0b6de1597bcb4cde6f91814c6 (diff)
Write visited documents from a dedicated executor to avoid blocking mbus
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java111
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java6
2 files changed, 68 insertions, 49 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 865aed221c5..77e84cffd25 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -177,6 +177,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
+ private final ScheduledExecutorService visitRenderer;
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
@@ -188,12 +189,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
- documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig);
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig,
+ Math.max(2, Runtime.getRuntime().availableProcessors() / 4));
}
DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
- ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) {
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, int visitorRendererThreads) {
this.clock = clock;
this.handlerTimeout = handlerTimeout;
this.parser = new DocumentOperationParser(documentmanagerConfig);
@@ -212,6 +214,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
TimeUnit.MILLISECONDS);
+ visitRenderer = Executors.newScheduledThreadPool(visitorRendererThreads, new DaemonThreadFactory("document-api-handler-renderer-"));
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -271,6 +274,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
dispatcher.shutdown();
visitDispatcher.shutdown();
+ visitRenderer.shutdown();
while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
dispatchEnqueued();
dispatchVisitEnqueued();
@@ -291,6 +295,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
visitDispatcher.shutdownNow();
+
+ if ( ! visitRenderer.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ visitRenderer.shutdownNow();
}
catch (InterruptedException e) {
log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down");
@@ -1025,10 +1032,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Called at the start of response rendering. */
default void onStart(JsonResponse response) throws IOException { }
- /** Called for every document received from backend visitors — must call the ack for these to proceed. */
+ /** Called for every document received from backend visitors—must call the ack for these to proceed. */
default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
- /** Called at the end of response rendering, before generic status data is written. */
+ /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
default void onEnd(JsonResponse response) throws IOException { }
}
@@ -1091,6 +1098,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
visit(request, parameters, streaming, handler, new VisitCallback() {
+ AtomicLong acks = new AtomicLong();
@Override public void onStart(JsonResponse response) throws IOException {
if (streaming)
response.commit(Response.Status.OK);
@@ -1098,10 +1106,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- response.writeDocumentValue(document);
- ack.run();
+ acks.incrementAndGet();
+ visitRenderer.execute(() -> {
+ response.writeDocumentValue(document);
+ ack.run();
+ acks.decrementAndGet();
+ });
}
@Override public void onEnd(JsonResponse response) throws IOException {
+ try {
+ while (acks.get() > 0)
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
response.writeArrayEnd();
}
});
@@ -1113,52 +1132,52 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) {
try {
- JsonResponse response = JsonResponse.create(request, handler);
+ JsonResponse response = JsonResponse.create(request, handler, streaming);
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
- super.onDone(code, message);
- loggingException(() -> {
- try (response) {
- callback.onEnd(response);
-
- if (getVisitorStatistics() != null)
- response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
-
- int status = Response.Status.BAD_GATEWAY;
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
- response.writeMessage("No buckets visited within timeout of " +
- parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
- status = Response.Status.GATEWAY_TIMEOUT;
- break;
- }
- // TODO jonmv: [test] limit pending,
- // TODO jonmv: [test] abort on shutdown,
- // TODO jonmv: always supply and document continuation?
- case SUCCESS: // Intentional fallthrough.
- case ABORTED: // Intentional fallthrough.
- if (error.get() == null) {
- ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
- if (progress != null && ! progress.isFinished())
- response.writeContinuation(progress.serializeToString());
-
- status = Response.Status.OK;
- break;
- }
- default:
- response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
+ visitRenderer.execute(() -> {
+ super.onDone(code, message);
+ loggingException(() -> {
+ try (response) {
+ callback.onEnd(response);
+
+ if (getVisitorStatistics() != null)
+ response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
+
+ int status = Response.Status.BAD_GATEWAY;
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
+ response.writeMessage("No buckets visited within timeout of " +
+ parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
+ status = Response.Status.GATEWAY_TIMEOUT;
+ break;
+ }
+ // TODO jonmv: always supply and document continuation?
+ case SUCCESS: // Intentional fallthrough.
+ case ABORTED: // Intentional fallthrough.
+ if (error.get() == null) {
+ ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
+ if (progress != null && ! progress.isFinished())
+ response.writeContinuation(progress.serializeToString());
+
+ status = Response.Status.OK;
+ break;
+ }
+ default:
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
+ }
+ if ( ! streaming)
+ response.commit(status);
}
- if ( ! streaming)
- response.commit(status);
- }
- });
- visitDispatcher.execute(() -> {
- phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
- visits.remove(this).destroy();
+ });
+ visitDispatcher.execute(() -> {
+ phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
+ visits.remove(this).destroy();
+ });
});
}
};
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 829a9bcab9f..1f2eb8f83ae 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -136,7 +136,7 @@ public class DocumentV1ApiTest {
access = new MockDocumentAccess(docConfig);
metric = new NullMetric();
metrics = new MetricReceiver.MockReceiver();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1);
}
@After
@@ -752,7 +752,7 @@ public class DocumentV1ApiTest {
@Test
public void testThroughput() throws InterruptedException {
DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig, 1);
int writers = 4;
int queueFill = executorConfig.maxThrottled() - writers;
@@ -803,7 +803,7 @@ public class DocumentV1ApiTest {
replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
return new Result(0);
});
- // Send the rest of the documents. Rely on resender to empty queue of throttled oppperations.
+ // Send the rest of the documents. Rely on resender to empty queue of throttled operations.
for (int i = queueFill; i < docs; i++) {
int j = i;
writer.execute(() -> {