diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 150 |
1 files changed, 53 insertions, 97 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 23bfde986cf..aa96b0932c3 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -8,9 +8,7 @@ import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; -import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.jdisc.ContentChannelOutputStream; -import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; @@ -88,12 +86,10 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -162,7 +158,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String TIME_CHUNK = "timeChunk"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; - private static final String STREAMING = "streaming"; private final Clock clock; private final Duration handlerTimeout; @@ -180,12 +175,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); - private final Executor defaultExecutor; private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject - public DocumentV1ApiHandler(ContainerThreadPool threadPool, - Metric metric, + public DocumentV1ApiHandler(Metric metric, MetricReceiver metricReceiver, VespaDocumentAccess documentAccess, DocumentmanagerConfig documentManagerConfig, @@ -193,12 +186,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, - documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor()); + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig); } DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, - ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) { + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) { this.clock = clock; this.handlerTimeout = handlerTimeout; this.parser = new DocumentOperationParser(documentmanagerConfig); @@ -217,7 +210,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); - this.defaultExecutor = defaultExecutor; } // ------------------------------------------------ Requests ------------------------------------------------- @@ -363,10 +355,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - boolean streaming = getProperty(request, STREAMING, booleanParser).orElse(false); - VisitorParameters parameters = parseGetParameters(request, path, streaming); + VisitorParameters parameters = parseGetParameters(request, path); return () -> { - visitAndWrite(request, parameters, handler, streaming); + visitAndWrite(request, parameters, handler); return true; // VisitorSession has its own throttle handling. }; }); @@ -582,22 +573,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static class JsonResponse implements AutoCloseable { private final BufferedContentChannel buffer = new BufferedContentChannel(); - private final OutputStream out; - private final JsonGenerator json; + private final OutputStream out = new ContentChannelOutputStream(buffer); + private final JsonGenerator json = jsonFactory.createGenerator(out); private final ResponseHandler handler; private ContentChannel channel; - private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException { + private JsonResponse(ResponseHandler handler) throws IOException { this.handler = handler; - out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24) - : new ContentChannelOutputStream(buffer); - json = jsonFactory.createGenerator(out); json.writeStartObject(); } /** Creates a new JsonResponse with path and id fields written. */ static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler, false); + JsonResponse response = new JsonResponse(handler); response.writePathId(path.rawPath()); response.writeDocId(path.id()); return response; @@ -605,19 +593,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Creates a new JsonResponse with path field written. */ static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException { - return create(request, handler, false); - } - - /** Creates a new JsonResponse with path field written. */ - static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException { - JsonResponse response = new JsonResponse(handler, streaming); + JsonResponse response = new JsonResponse(handler); response.writePathId(request.getUri().getRawPath()); return response; } /** Creates a new JsonResponse with path and message fields written. */ static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException { - JsonResponse response = create(request, handler); + JsonResponse response = new JsonResponse(handler); + response.writePathId(request.getUri().getRawPath()); response.writeMessage(message); return response; } @@ -971,10 +955,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) { - int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10, - getProperty(request, WANTED_DOCUMENT_COUNT, integerParser) - .orElse(streaming ? Integer.MAX_VALUE : 1)); + private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) { + int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1)); if (wantedDocumentCount <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); @@ -1033,10 +1015,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Called at the start of response rendering. */ default void onStart(JsonResponse response) throws IOException { } - /** Called for every document received from backend visitors—must call the ack for these to proceed. */ + /** Called for every document received from backend visitors — must call the ack for these to proceed. */ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } - /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */ + /** Called at the end of response rendering, before generic status data is written. */ default void onEnd(JsonResponse response) throws IOException { } } @@ -1060,7 +1042,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { - visit(request, parameters, false, handler, new VisitCallback() { + visit(request, parameters, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { @@ -1097,92 +1079,66 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { - visit(request, parameters, streaming, handler, new VisitCallback() { - final Deque<Runnable> writes = new ConcurrentLinkedDeque<>(); - final AtomicBoolean writing = new AtomicBoolean(); + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { @Override public void onStart(JsonResponse response) throws IOException { - if (streaming) - response.commit(Response.Status.OK); - response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - writes.add(() -> { - response.writeDocumentValue(document); - ack.run(); - }); - if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing. - defaultExecutor.execute(() -> { - for (Runnable write; (write = writes.poll()) != null; write.run()); - writing.set(false); - }); + response.writeDocumentValue(document); + ack.run(); } @Override public void onEnd(JsonResponse response) throws IOException { - // Wait for other writers to complete, then write what remains here. - while ( ! writing.compareAndSet(false, true)) { - try { - Thread.sleep(1); - } - catch (InterruptedException e) { - log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen"); - Thread.currentThread().interrupt(); - } - } - for (Runnable write; (write = writes.poll()) != null; write.run()); response.writeArrayEnd(); } }); } private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, false, handler, new VisitCallback() { }); + visit(request, parameters, handler, new VisitCallback() { }); } - private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { try { - JsonResponse response = JsonResponse.create(request, handler, streaming); + JsonResponse response = JsonResponse.create(request, handler); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { - defaultExecutor.execute(() -> { - super.onDone(code, message); - loggingException(() -> { - try (response) { - callback.onEnd(response); + super.onDone(code, message); + loggingException(() -> { + callback.onEnd(response); + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { + response.writeMessage("No buckets visited within timeout of " + + parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); + response.respond(Response.Status.GATEWAY_TIMEOUT); + break; + } + case SUCCESS: // Intentional fallthrough. + case ABORTED: // Intentional fallthrough. + if (error.get() == null) { + ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); + if (progress != null && ! progress.isFinished()) + response.writeContinuation(progress.serializeToString()); + if (getVisitorStatistics() != null) + response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); + + response.respond(Response.Status.OK); + break; + } + default: + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); if (getVisitorStatistics() != null) response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - int status = Response.Status.BAD_GATEWAY; - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { - response.writeMessage("No buckets visited within timeout of " + - parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); - status = Response.Status.GATEWAY_TIMEOUT; - break; - } - // TODO jonmv: always supply and document continuation? - case SUCCESS: // Intentional fallthrough. - case ABORTED: // Intentional fallthrough. - if (error.get() == null) { - ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); - if (progress != null && ! progress.isFinished()) - response.writeContinuation(progress.serializeToString()); - - status = Response.Status.OK; - break; - } - default: - response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); - } - if ( ! streaming) - response.commit(status); - } - }); + response.respond(Response.Status.BAD_GATEWAY); + } + }); + visitDispatcher.execute(() -> { phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); |