aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-26 13:36:20 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-26 13:36:20 +0200
commit6555fc92200cb906dd7d6f7d226ed490c3b09b91 (patch)
tree0c5a605234672ad17c850bc255d4a0c738b5fea9
parent0d2680649c1c08641ddbf6327e5e7b24eb237e7f (diff)
Revert to previous behaviour, and simply delay ack until doc written
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java62
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java6
2 files changed, 28 insertions, 40 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index d30a75ea920..1e9cdfbbbea 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -9,7 +9,6 @@ import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.container.core.documentapi.VespaDocumentAccess;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
-import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentOperation;
@@ -92,7 +91,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -181,12 +179,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
- private final Executor defaultExecutor;
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
- public DocumentV1ApiHandler(Executor defaultExecutor,
- Metric metric,
+ public DocumentV1ApiHandler(Metric metric,
MetricReceiver metricReceiver,
VespaDocumentAccess documentAccess,
DocumentmanagerConfig documentManagerConfig,
@@ -194,12 +190,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
- documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, defaultExecutor);
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig);
}
DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
- ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, Executor defaultExecutor) {
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) {
this.clock = clock;
this.handlerTimeout = handlerTimeout;
this.parser = new DocumentOperationParser(documentmanagerConfig);
@@ -218,7 +214,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
TimeUnit.MILLISECONDS);
- this.defaultExecutor = defaultExecutor;
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -582,23 +577,23 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Class for writing and returning JSON responses to document operations in a thread safe manner. */
private static class JsonResponse implements AutoCloseable {
+ private static final ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+
private final BufferedContentChannel buffer = new BufferedContentChannel();
- private final OutputStream out;
+ private final OutputStream out = new ContentChannelOutputStream(buffer);
private final JsonGenerator json;
private final ResponseHandler handler;
private ContentChannel channel;
- private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException {
+ private JsonResponse(ResponseHandler handler) throws IOException {
this.handler = handler;
- out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24)
- : new ContentChannelOutputStream(buffer);
json = jsonFactory.createGenerator(out);
json.writeStartObject();
}
/** Creates a new JsonResponse with path and id fields written. */
static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler, false);
+ JsonResponse response = new JsonResponse(handler);
response.writePathId(path.rawPath());
response.writeDocId(path.id());
return response;
@@ -606,12 +601,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Creates a new JsonResponse with path field written. */
static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException {
- return create(request, handler, false);
- }
-
- /** Creates a new JsonResponse with path field written. */
- static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException {
- JsonResponse response = new JsonResponse(handler, streaming);
+ JsonResponse response = new JsonResponse(handler);
response.writePathId(request.getUri().getRawPath());
return response;
}
@@ -704,8 +694,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
- synchronized void writeDocumentValue(Document document) {
+ synchronized void writeDocumentValue(Document document, CompletionHandler completionHandler) {
new JsonWriter(json).write(document);
+ if (completionHandler != null)
+ buffer.write(emptyBuffer, completionHandler);
}
synchronized void writeArrayEnd() throws IOException {
@@ -1107,8 +1099,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
visit(request, parameters, streaming, handler, new VisitCallback() {
- final Deque<Runnable> writes = new ConcurrentLinkedDeque<>();
- final AtomicBoolean writing = new AtomicBoolean();
@Override public void onStart(JsonResponse response) throws IOException {
if (streaming)
response.commit(Response.Status.OK);
@@ -1116,20 +1106,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- writes.add(() -> {
- loggingException(() -> response.writeDocumentValue(document));
- ack.run();
- });
- if (writing.compareAndSet(false, true)) // Occupy only a single thread for writing.
- defaultExecutor.execute(() -> {
- while (writing.get()) {
- for (Runnable write; (write = writes.poll()) != null; write.run());
- writing.set( ! writes.isEmpty());
- }
+ if (streaming)
+ response.writeDocumentValue(document, new CompletionHandler() {
+ @Override public void completed() { ack.run();}
+ @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); }
});
+ else {
+ response.writeDocumentValue(document, null);
+ ack.run();
+ }
}
@Override public void onEnd(JsonResponse response) throws IOException {
- for (Runnable write; (write = writes.poll()) != null; write.run());
response.writeArrayEnd();
}
});
@@ -1141,7 +1128,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) {
try {
- JsonResponse response = JsonResponse.create(request, handler, streaming);
+ JsonResponse response = JsonResponse.create(request, handler);
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
@@ -1164,7 +1151,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
status = Response.Status.GATEWAY_TIMEOUT;
break;
}
- // TODO jonmv: always supply and document continuation?
case SUCCESS: // Intentional fallthrough.
case ABORTED: // Intentional fallthrough.
if (error.get() == null) {
@@ -1182,8 +1168,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.commit(status);
}
});
- phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
- visits.remove(this).destroy();
+ visitDispatcher.execute(() -> {
+ phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
+ visits.remove(this).destroy();
+ });
}
};
if (parameters.getRemoteDataHandler() == null) {
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 1629777f837..0e62b620828 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -137,7 +137,7 @@ public class DocumentV1ApiTest {
metric = new NullMetric();
metrics = new MetricReceiver.MockReceiver();
handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
- executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2));
+ executorConfig, clusterConfig, bucketConfig);
}
@After
@@ -183,7 +183,7 @@ public class DocumentV1ApiTest {
}
@Test
- public void testResponses() throws InterruptedException {
+ public void testResponses() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null));
// GET at non-existent path returns 404 with available paths
@@ -756,7 +756,7 @@ public class DocumentV1ApiTest {
public void testThroughput() throws InterruptedException {
DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
- executorConfig, clusterConfig, bucketConfig, Executors.newFixedThreadPool(2));
+ executorConfig, clusterConfig, bucketConfig);
int writers = 4;
int queueFill = executorConfig.maxThrottled() - writers;