summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-21 17:48:19 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-21 17:48:19 +0200
commit9e2cbe370e498fe0a89c8abae2a8b23344cf3f6a (patch)
treefaf8464e04acf1a943a2b751f3745a2c74c1dc29
parent56c3fc7c2a3b7e317e79593aa56ed2d03472cbde (diff)
Revert "Merge pull request #19686 from vespa-engine/jonmv/revert-streamed-visits"
This reverts commit 56c3fc7c2a3b7e317e79593aa56ed2d03472cbde, reversing changes made to 367dae08c390833a54c1bae11282df5a7e056d16.
-rw-r--r--container-core/abi-spec.json13
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java92
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java80
-rw-r--r--container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java3
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java150
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java78
6 files changed, 275 insertions, 141 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json
index 02d43104a3f..8c0f3e5fd80 100644
--- a/container-core/abi-spec.json
+++ b/container-core/abi-spec.json
@@ -762,6 +762,19 @@
],
"fields": []
},
+ "com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream": {
+ "superClass": "com.yahoo.container.jdisc.ContentChannelOutputStream",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(com.yahoo.jdisc.handler.ContentChannel, long)",
+ "public void send(java.nio.ByteBuffer)",
+ "public void flush()"
+ ],
+ "fields": []
+ },
"com.yahoo.container.jdisc.MetricConsumerFactory": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
new file mode 100644
index 00000000000..aec4eeecd7b
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
@@ -0,0 +1,92 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc;
+
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author baldersheim
+ */
+public class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
+
+ private final long maxPending;
+ private final AtomicLong sent = new AtomicLong(0);
+ private final AtomicLong acked = new AtomicLong(0);
+
+ public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
+ super(endpoint);
+ this.maxPending = maxPending;
+ }
+
+ private long pendingBytes() {
+ return sent.get() - acked.get();
+ }
+
+ private class TrackCompletion implements CompletionHandler {
+
+ private final long written;
+ private final AtomicBoolean replied = new AtomicBoolean(false);
+
+ TrackCompletion(long written) {
+ this.written = written;
+ sent.addAndGet(written);
+ }
+
+ @Override
+ public void completed() {
+ if (!replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ if (!replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ }
+
+ @Override
+ public void send(ByteBuffer src) throws IOException {
+ try {
+ stallWhilePendingAbove(maxPending);
+ }
+ catch (InterruptedException ignored) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
+ try {
+ send(src, pendingTracker);
+ }
+ catch (Throwable throwable) {
+ pendingTracker.failed(throwable);
+ throw throwable;
+ }
+ }
+
+ private void stallWhilePendingAbove(long pending) throws InterruptedException {
+ while (pendingBytes() > pending) {
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ try {
+ stallWhilePendingAbove(0);
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
index 0bfe4afe07d..0c3c1e2120b 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
@@ -10,9 +10,6 @@ import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.handler.ResponseHandler;
-import java.io.InterruptedIOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.io.IOException;
@@ -253,81 +250,4 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler
}
- /**
- * @author baldersheim
- */
- static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
- private final long maxPending;
- private final AtomicLong sent = new AtomicLong(0);
- private final AtomicLong acked = new AtomicLong(0);
-
- public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
- super(endpoint);
- this.maxPending = maxPending;
- }
-
- private long pendingBytes() {
- return sent.get() - acked.get();
- }
-
- private class TrackCompletion implements CompletionHandler {
-
- private final long written;
- private final AtomicBoolean replied = new AtomicBoolean(false);
-
- TrackCompletion(long written) {
- this.written = written;
- sent.addAndGet(written);
- }
-
- @Override
- public void completed() {
- if ( ! replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
-
- @Override
- public void failed(Throwable t) {
- if ( ! replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
- }
-
- @Override
- public void send(ByteBuffer src) throws IOException {
- try {
- stallWhilePendingAbove(maxPending);
- } catch (InterruptedException ignored) {
- throw new InterruptedIOException("Interrupted waiting for IO");
- }
- CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
- try {
- send(src, pendingTracker);
- } catch (Throwable throwable) {
- pendingTracker.failed(throwable);
- throw throwable;
- }
- }
-
- private void stallWhilePendingAbove(long pending) throws InterruptedException {
- while (pendingBytes() > pending) {
- Thread.sleep(1);
- }
- }
-
- @Override
- public void flush() throws IOException {
- super.flush();
- try {
- stallWhilePendingAbove(0);
- }
- catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted waiting for IO");
- }
- }
-
- }
-
}
diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
index d1036ce0e45..a9b16799aea 100644
--- a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
+++ b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc;
-import com.yahoo.container.jdisc.ThreadedHttpRequestHandler.MaxPendingContentChannelOutputStream;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.application.ContainerBuilder;
@@ -20,11 +19,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index aa96b0932c3..a3e3c512dcd 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -8,7 +8,9 @@ import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
+import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentOperation;
@@ -86,10 +88,12 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -158,6 +162,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String TIME_CHUNK = "timeChunk";
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
+ private static final String STREAM = "stream";
private final Clock clock;
private final Duration handlerTimeout;
@@ -175,10 +180,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
+ private final Executor defaultExecutor;
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
- public DocumentV1ApiHandler(Metric metric,
+ public DocumentV1ApiHandler(ContainerThreadPool threadPool,
+ Metric metric,
MetricReceiver metricReceiver,
VespaDocumentAccess documentAccess,
DocumentmanagerConfig documentManagerConfig,
@@ -186,12 +193,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
- documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig);
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor());
}
DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
- ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) {
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) {
this.clock = clock;
this.handlerTimeout = handlerTimeout;
this.parser = new DocumentOperationParser(documentmanagerConfig);
@@ -210,6 +217,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
TimeUnit.MILLISECONDS);
+ this.defaultExecutor = defaultExecutor;
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -355,9 +363,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
- VisitorParameters parameters = parseGetParameters(request, path);
+ boolean streaming = getProperty(request, STREAM, booleanParser).orElse(false);
+ VisitorParameters parameters = parseGetParameters(request, path, streaming);
return () -> {
- visitAndWrite(request, parameters, handler);
+ visitAndWrite(request, parameters, handler, streaming);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -573,19 +582,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static class JsonResponse implements AutoCloseable {
private final BufferedContentChannel buffer = new BufferedContentChannel();
- private final OutputStream out = new ContentChannelOutputStream(buffer);
- private final JsonGenerator json = jsonFactory.createGenerator(out);
+ private final OutputStream out;
+ private final JsonGenerator json;
private final ResponseHandler handler;
private ContentChannel channel;
- private JsonResponse(ResponseHandler handler) throws IOException {
+ private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException {
this.handler = handler;
+ out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24)
+ : new ContentChannelOutputStream(buffer);
+ json = jsonFactory.createGenerator(out);
json.writeStartObject();
}
/** Creates a new JsonResponse with path and id fields written. */
static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler);
+ JsonResponse response = new JsonResponse(handler, false);
response.writePathId(path.rawPath());
response.writeDocId(path.id());
return response;
@@ -593,15 +605,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Creates a new JsonResponse with path field written. */
static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler);
+ return create(request, handler, false);
+ }
+
+ /** Creates a new JsonResponse with path field written. */
+ static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException {
+ JsonResponse response = new JsonResponse(handler, streaming);
response.writePathId(request.getUri().getRawPath());
return response;
}
/** Creates a new JsonResponse with path and message fields written. */
static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler);
- response.writePathId(request.getUri().getRawPath());
+ JsonResponse response = create(request, handler);
response.writeMessage(message);
return response;
}
@@ -955,8 +971,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------- Visits ------------------------------------------------
- private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path) {
- int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser).orElse(1));
+ private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) {
+ int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10,
+ getProperty(request, WANTED_DOCUMENT_COUNT, integerParser)
+ .orElse(streaming ? Integer.MAX_VALUE : 1));
if (wantedDocumentCount <= 0)
throw new IllegalArgumentException("wantedDocumentCount must be positive");
@@ -1015,10 +1033,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Called at the start of response rendering. */
default void onStart(JsonResponse response) throws IOException { }
- /** Called for every document received from backend visitors — must call the ack for these to proceed. */
+ /** Called for every document received from backend visitors—must call the ack for these to proceed. */
default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
- /** Called at the end of response rendering, before generic status data is written. */
+ /** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
default void onEnd(JsonResponse response) throws IOException { }
}
@@ -1042,7 +1060,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
- visit(request, parameters, handler, new VisitCallback() {
+ visit(request, parameters, false, handler, new VisitCallback() {
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
@@ -1079,66 +1097,92 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
- visit(request, parameters, handler, new VisitCallback() {
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
+ visit(request, parameters, streaming, handler, new VisitCallback() {
+ final Deque<Runnable> writes = new ConcurrentLinkedDeque<>();
+ final AtomicBoolean writing = new AtomicBoolean();
@Override public void onStart(JsonResponse response) throws IOException {
+ if (streaming)
+ response.commit(Response.Status.OK);
+
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- response.writeDocumentValue(document);
- ack.run();
+ writes.add(() -> {
+ response.writeDocumentValue(document);
+ ack.run();
+ });
+ if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing.
+ defaultExecutor.execute(() -> {
+ for (Runnable write; (write = writes.poll()) != null; write.run());
+ writing.set(false);
+ });
}
@Override public void onEnd(JsonResponse response) throws IOException {
+ // Wait for other writers to complete, then write what remains here.
+ while ( ! writing.compareAndSet(false, true)) {
+ try {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e) {
+ log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen");
+ Thread.currentThread().interrupt();
+ }
+ }
+ for (Runnable write; (write = writes.poll()) != null; write.run());
response.writeArrayEnd();
}
});
}
private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
- visit(request, parameters, handler, new VisitCallback() { });
+ visit(request, parameters, false, handler, new VisitCallback() { });
}
- private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
+ private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) {
try {
- JsonResponse response = JsonResponse.create(request, handler);
+ JsonResponse response = JsonResponse.create(request, handler, streaming);
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
- super.onDone(code, message);
- loggingException(() -> {
- callback.onEnd(response);
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
- response.writeMessage("No buckets visited within timeout of " +
- parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
- response.respond(Response.Status.GATEWAY_TIMEOUT);
- break;
- }
- case SUCCESS: // Intentional fallthrough.
- case ABORTED: // Intentional fallthrough.
- if (error.get() == null) {
- ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
- if (progress != null && ! progress.isFinished())
- response.writeContinuation(progress.serializeToString());
+ defaultExecutor.execute(() -> {
+ super.onDone(code, message);
+ loggingException(() -> {
+ try (response) {
+ callback.onEnd(response);
- if (getVisitorStatistics() != null)
- response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
-
- response.respond(Response.Status.OK);
- break;
- }
- default:
- response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
if (getVisitorStatistics() != null)
response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
- response.respond(Response.Status.BAD_GATEWAY);
- }
- });
- visitDispatcher.execute(() -> {
+ int status = Response.Status.BAD_GATEWAY;
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
+ response.writeMessage("No buckets visited within timeout of " +
+ parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
+ status = Response.Status.GATEWAY_TIMEOUT;
+ break;
+ }
+ // TODO jonmv: always supply and document continuation?
+ case SUCCESS: // Intentional fallthrough.
+ case ABORTED: // Intentional fallthrough.
+ if (error.get() == null) {
+ ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
+ if (progress != null && ! progress.isFinished())
+ response.writeContinuation(progress.serializeToString());
+
+ status = Response.Status.OK;
+ break;
+ }
+ default:
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
+ }
+ if ( ! streaming)
+ response.commit(status);
+ }
+ });
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 96700f08823..b23533a720e 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -4,6 +4,7 @@ package com.yahoo.document.restapi.resource;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.container.jdisc.RequestHandlerTestDriver;
import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.document.BucketId;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
@@ -37,6 +38,7 @@ import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorDestinationParameters;
import com.yahoo.documentapi.VisitorDestinationSession;
+import com.yahoo.documentapi.VisitorIterator;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
@@ -134,7 +136,8 @@ public class DocumentV1ApiTest {
access = new MockDocumentAccess(docConfig);
metric = new NullMetric();
metrics = new MetricReceiver.MockReceiver();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
+ executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2));
}
@After
@@ -245,19 +248,83 @@ public class DocumentV1ApiTest {
"}", response.readAll());
assertEquals(200, response.getStatus());
+ // GET at root is a visit. Streaming mode can be specified with &stream=true
+ access.expect(tokens);
+ access.expect(parameters -> {
+ assertEquals("content", parameters.getRoute().toString());
+ assertEquals("default", parameters.getBucketSpace());
+ assertEquals(1025, parameters.getMaxTotalHits()); // Not bounded likewise for streamed responses.
+ assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
+ assertEquals("[id]", parameters.getFieldSet());
+ assertEquals("(all the things)", parameters.getDocumentSelection());
+ assertEquals(6000, parameters.getSessionTimeoutMs());
+ // Put some documents in the response
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2));
+ VisitorStatistics statistics = new VisitorStatistics();
+ statistics.setBucketsVisited(1);
+ statistics.setDocumentsVisited(3);
+ parameters.getControlHandler().onVisitorStatistics(statistics);
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK");
+ });
+ response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
+ "&selection=all%20the%20things&fieldSet=[id]&timeout=6&stream=true");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1\"," +
+ " \"documents\": [" +
+ " {" +
+ " \"id\": \"id:space:music::one\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Tom Waits\"" +
+ " }" +
+ " }," +
+ " {" +
+ " \"id\": \"id:space:music:n=1:two\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Asa-Chan & Jun-Ray\"" +
+ " }" +
+ " }," +
+ " {" +
+ " \"id\": \"id:space:music:g=a:three\"," +
+ " \"fields\": {}" +
+ " }" +
+ " ]," +
+ " \"documentCount\": 3" +
+ "}", response.readAll());
+ assertEquals(200, response.getStatus());
+
// GET with namespace and document type is a restricted visit.
+ ProgressToken progress = new ProgressToken();
+ VisitorIterator.createFromExplicitBucketSet(Set.of(new BucketId(1), new BucketId(2)), 8, progress)
+ .update(new BucketId(1), new BucketId(1));
access.expect(parameters -> {
assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection());
- assertEquals(new ProgressToken().serializeToString(), parameters.getResumeToken().serializeToString());
+ assertEquals(progress.serializeToString(), parameters.getResumeToken().serializeToString());
throw new IllegalArgumentException("parse failure");
});
- response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString());
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + progress.serializeToString());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/docid\"," +
" \"message\": \"parse failure\"" +
"}", response.readAll());
assertEquals(400, response.getStatus());
+ // GET when a streamed visit returns status code 200 also when errors occur.
+ access.expect(parameters -> {
+ assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection());
+ parameters.getControlHandler().onProgress(progress);
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "failure?");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?stream=true");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"," +
+ " \"documents\": []," +
+ //" \"continuation\": \"" + progress.serializeToString() + "\"," +
+ " \"message\": \"failure?\"" +
+ "}", response.readAll());
+ assertEquals(200, response.getStatus());
+
// POST with namespace and document type is a restricted visit with a required destination cluster ("destinationCluster")
access.expect(parameters -> {
fail("Not supposed to run");
@@ -686,7 +753,8 @@ public class DocumentV1ApiTest {
@Test
public void testThroughput() throws InterruptedException {
DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
+ executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2));
int writers = 4;
int queueFill = executorConfig.maxThrottled() - writers;
@@ -737,7 +805,7 @@ public class DocumentV1ApiTest {
replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
return new Result(0);
});
- // Send the rest of the documents. Rely on resender to empty queue of throttled oppperations.
+ // Send the rest of the documents. Rely on resender to empty queue of throttled operations.
for (int i = queueFill; i < docs; i++) {
int j = i;
writer.execute(() -> {