summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java150
1 files changed, 53 insertions, 97 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 23bfde986cf..aa96b0932c3 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -8,9 +8,7 @@ import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.container.core.documentapi.VespaDocumentAccess;
-import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
-import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentOperation;
@@ -88,12 +86,10 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -162,7 +158,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String TIME_CHUNK = "timeChunk";
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
- private static final String STREAMING = "streaming";
private final Clock clock;
private final Duration handlerTimeout;
@@ -180,12 +175,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
- private final Executor defaultExecutor;
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
- public DocumentV1ApiHandler(ContainerThreadPool threadPool,
- Metric metric,
+ public DocumentV1ApiHandler(Metric metric,
MetricReceiver metricReceiver,
VespaDocumentAccess documentAccess,
DocumentmanagerConfig documentManagerConfig,
@@ -193,12 +186,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
- documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor());
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig);
}
DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
- ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) {
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) {
this.clock = clock;
this.handlerTimeout = handlerTimeout;
this.parser = new DocumentOperationParser(documentmanagerConfig);
@@ -217,7 +210,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
TimeUnit.MILLISECONDS);
- this.defaultExecutor = defaultExecutor;
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -363,10 +355,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
- boolean streaming = getProperty(request, STREAMING, booleanParser).orElse(false);
- VisitorParameters parameters = parseGetParameters(request, path, streaming);
+ VisitorParameters parameters = parseGetParameters(request, path);
return () -> {
- visitAndWrite(request, parameters, handler, streaming);
+ visitAndWrite(request, parameters, handler);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -582,22 +573,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static class JsonResponse implements AutoCloseable {
private final BufferedContentChannel buffer = new BufferedContentChannel();
- private final OutputStream out;
- private final JsonGenerator json;
+ private final OutputStream out = new ContentChannelOutputStream(buffer);
+ private final JsonGenerator json = jsonFactory.createGenerator(out);
private final ResponseHandler handler;
private ContentChannel channel;
- private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException {
+ private JsonResponse(ResponseHandler handler) throws IOException {
this.handler = handler;
- out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24)
- : new ContentChannelOutputStream(buffer);
- json = jsonFactory.createGenerator(out);
json.writeStartObject();
}
/** Creates a new JsonResponse with path and id fields written. */
static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler, false);
+ JsonResponse response = new JsonResponse(handler);
response.writePathId(path.rawPath());
response.writeDocId(path.id());
return response;
@@ -605,19 +593,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Creates a new JsonResponse with path field written. */
static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException {
- return create(request, handler, false);
- }
-
- /** Creates a new JsonResponse with path field written. */
- static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException {
- JsonResponse response = new JsonResponse(handler, streaming);
+ JsonResponse response = new JsonResponse(handler);
response.writePathId(request.getUri().getRawPath());
return response;
}
/** Creates a new JsonResponse with path and message fields written. */
static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException {
- JsonResponse response = create(request, handler);
+ JsonResponse response = new JsonResponse(handler);
+ response.writePathId(request.getUri().getRawPath());
response.writeMessage(message);
return response;
}
@@ -971,10 +955,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------- Visits ------------------------------------------------
- private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) {
- int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10,
- getProperty(request, WANTED_DOCUMENT_COUNT, integerParser)
- .orElse(streaming ? Integer.MAX_VALUE : 1));
+ private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) {
+ int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1));
if (wantedDocumentCount <= 0)
throw new IllegalArgumentException("wantedDocumentCount must be positive");
@@ -1033,10 +1015,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Called at the start of response rendering. */
default void onStart(JsonResponse response) throws IOException { }
- /** Called for every document received from backend visitors—must call the ack for these to proceed. */
+ /** Called for every document received from backend visitors — must call the ack for these to proceed. */
default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
- /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
+ /** Called at the end of response rendering, before generic status data is written. */
default void onEnd(JsonResponse response) throws IOException { }
}
@@ -1060,7 +1042,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
- visit(request, parameters, false, handler, new VisitCallback() {
+ visit(request, parameters, handler, new VisitCallback() {
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
@@ -1097,92 +1079,66 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
- visit(request, parameters, streaming, handler, new VisitCallback() {
- final Deque<Runnable> writes = new ConcurrentLinkedDeque<>();
- final AtomicBoolean writing = new AtomicBoolean();
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ visit(request, parameters, handler, new VisitCallback() {
@Override public void onStart(JsonResponse response) throws IOException {
- if (streaming)
- response.commit(Response.Status.OK);
-
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- writes.add(() -> {
- response.writeDocumentValue(document);
- ack.run();
- });
- if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing.
- defaultExecutor.execute(() -> {
- for (Runnable write; (write = writes.poll()) != null; write.run());
- writing.set(false);
- });
+ response.writeDocumentValue(document);
+ ack.run();
}
@Override public void onEnd(JsonResponse response) throws IOException {
- // Wait for other writers to complete, then write what remains here.
- while ( ! writing.compareAndSet(false, true)) {
- try {
- Thread.sleep(1);
- }
- catch (InterruptedException e) {
- log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen");
- Thread.currentThread().interrupt();
- }
- }
- for (Runnable write; (write = writes.poll()) != null; write.run());
response.writeArrayEnd();
}
});
}
private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
- visit(request, parameters, false, handler, new VisitCallback() { });
+ visit(request, parameters, handler, new VisitCallback() { });
}
- private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) {
+ private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
try {
- JsonResponse response = JsonResponse.create(request, handler, streaming);
+ JsonResponse response = JsonResponse.create(request, handler);
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
- defaultExecutor.execute(() -> {
- super.onDone(code, message);
- loggingException(() -> {
- try (response) {
- callback.onEnd(response);
+ super.onDone(code, message);
+ loggingException(() -> {
+ callback.onEnd(response);
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
+ response.writeMessage("No buckets visited within timeout of " +
+ parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
+ response.respond(Response.Status.GATEWAY_TIMEOUT);
+ break;
+ }
+ case SUCCESS: // Intentional fallthrough.
+ case ABORTED: // Intentional fallthrough.
+ if (error.get() == null) {
+ ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
+ if (progress != null && ! progress.isFinished())
+ response.writeContinuation(progress.serializeToString());
+ if (getVisitorStatistics() != null)
+ response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
+
+ response.respond(Response.Status.OK);
+ break;
+ }
+ default:
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
if (getVisitorStatistics() != null)
response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
- int status = Response.Status.BAD_GATEWAY;
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
- response.writeMessage("No buckets visited within timeout of " +
- parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
- status = Response.Status.GATEWAY_TIMEOUT;
- break;
- }
- // TODO jonmv: always supply and document continuation?
- case SUCCESS: // Intentional fallthrough.
- case ABORTED: // Intentional fallthrough.
- if (error.get() == null) {
- ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
- if (progress != null && ! progress.isFinished())
- response.writeContinuation(progress.serializeToString());
-
- status = Response.Status.OK;
- break;
- }
- default:
- response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
- }
- if ( ! streaming)
- response.commit(status);
- }
- });
+ response.respond(Response.Status.BAD_GATEWAY);
+ }
+ });
+ visitDispatcher.execute(() -> {
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});