diff options
Diffstat (limited to 'vespaclient-container-plugin/src')
2 files changed, 28 insertions, 40 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index d30a75ea920..1e9cdfbbbea 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -9,7 +9,6 @@ import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.jdisc.ContentChannelOutputStream; -import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; @@ -92,7 +91,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -181,12 +179,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); - private final Executor defaultExecutor; private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject - public DocumentV1ApiHandler(Executor defaultExecutor, - Metric metric, + public DocumentV1ApiHandler(Metric metric, MetricReceiver metricReceiver, VespaDocumentAccess documentAccess, DocumentmanagerConfig documentManagerConfig, @@ -194,12 +190,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, - documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, defaultExecutor); + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig); } DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, - ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) { + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) { this.clock = clock; this.handlerTimeout = handlerTimeout; this.parser = new DocumentOperationParser(documentmanagerConfig); @@ -218,7 +214,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); - this.defaultExecutor = defaultExecutor; } // ------------------------------------------------ Requests ------------------------------------------------- @@ -582,23 +577,23 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Class for writing and returning JSON responses to document operations in a thread safe manner. */ private static class JsonResponse implements AutoCloseable { + private static final ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]); + private final BufferedContentChannel buffer = new BufferedContentChannel(); - private final OutputStream out; + private final OutputStream out = new ContentChannelOutputStream(buffer); private final JsonGenerator json; private final ResponseHandler handler; private ContentChannel channel; - private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException { + private JsonResponse(ResponseHandler handler) throws IOException { this.handler = handler; - out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24) - : new ContentChannelOutputStream(buffer); json = jsonFactory.createGenerator(out); json.writeStartObject(); } /** Creates a new JsonResponse with path and id fields written. */ static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler, false); + JsonResponse response = new JsonResponse(handler); response.writePathId(path.rawPath()); response.writeDocId(path.id()); return response; @@ -606,12 +601,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Creates a new JsonResponse with path field written. */ static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException { - return create(request, handler, false); - } - - /** Creates a new JsonResponse with path field written. */ - static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException { - JsonResponse response = new JsonResponse(handler, streaming); + JsonResponse response = new JsonResponse(handler); response.writePathId(request.getUri().getRawPath()); return response; } @@ -704,8 +694,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } - synchronized void writeDocumentValue(Document document) { + synchronized void writeDocumentValue(Document document, CompletionHandler completionHandler) { new JsonWriter(json).write(document); + if (completionHandler != null) + buffer.write(emptyBuffer, completionHandler); } synchronized void writeArrayEnd() throws IOException { @@ -1107,8 +1099,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { visit(request, parameters, streaming, handler, new VisitCallback() { - final Deque<Runnable> writes = new ConcurrentLinkedDeque<>(); - final AtomicBoolean writing = new AtomicBoolean(); @Override public void onStart(JsonResponse response) throws IOException { if (streaming) response.commit(Response.Status.OK); @@ -1116,20 +1106,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - writes.add(() -> { - loggingException(() -> response.writeDocumentValue(document)); - ack.run(); - }); - if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing. - defaultExecutor.execute(() -> { - while (writing.get()) { - for (Runnable write; (write = writes.poll()) != null; write.run()); - writing.set( ! writes.isEmpty()); - } + if (streaming) + response.writeDocumentValue(document, new CompletionHandler() { + @Override public void completed() { ack.run();} + @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } }); + else { + response.writeDocumentValue(document, null); + ack.run(); + } } @Override public void onEnd(JsonResponse response) throws IOException { - for (Runnable write; (write = writes.poll()) != null; write.run()); response.writeArrayEnd(); } }); @@ -1141,7 +1128,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { try { - JsonResponse response = JsonResponse.create(request, handler, streaming); + JsonResponse response = JsonResponse.create(request, handler); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); @@ -1164,7 +1151,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { status = Response.Status.GATEWAY_TIMEOUT; break; } - // TODO jonmv: always supply and document continuation? case SUCCESS: // Intentional fallthrough. case ABORTED: // Intentional fallthrough. if (error.get() == null) { @@ -1182,8 +1168,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.commit(status); } }); - phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. - visits.remove(this).destroy(); + visitDispatcher.execute(() -> { + phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. + visits.remove(this).destroy(); + }); } }; if (parameters.getRemoteDataHandler() == null) { diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 1629777f837..0e62b620828 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -137,7 +137,7 @@ public class DocumentV1ApiTest { metric = new NullMetric(); metrics = new MetricReceiver.MockReceiver(); handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, - executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2)); + executorConfig, clusterConfig, bucketConfig); } @After @@ -183,7 +183,7 @@ public class DocumentV1ApiTest { } @Test - public void testResponses() throws InterruptedException { + public void testResponses() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null)); // GET at non-existent path returns 404 with available paths @@ -756,7 +756,7 @@ public class DocumentV1ApiTest { public void testThroughput() throws InterruptedException { DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, - executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2)); + executorConfig, clusterConfig, bucketConfig); int writers = 4; int queueFill = executorConfig.maxThrottled() - writers; |