summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-21 08:13:16 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-21 08:13:16 +0200
commit2f39f64ddba3dfe6f0b9dd92d6284f4770003053 (patch)
tree5cdaf2075d70b84060054f6dcb13eba0334ce62b /vespaclient-container-plugin
parentcf9657500b1e59ea63db9189ae31b8f485455560 (diff)
Use a queue of pending writes for each visitor
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java26
1 files changed, 15 insertions, 11 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 833131d581d..711b6388b7a 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -91,6 +91,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -1098,31 +1099,34 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
+ private void renderDocuments(Deque<Runnable> writes, JsonResponse response, AtomicBoolean done) {
+ synchronized (response) {
+ for (Runnable write; (write = writes.poll()) != null; write.run()) ;
+ if ( ! done.get())
+ visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS);
+ }
+ }
+
private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
visit(request, parameters, streaming, handler, new VisitCallback() {
- AtomicLong acks = new AtomicLong();
+ final Deque<Runnable> writes = new ConcurrentLinkedDeque<>();
+ final AtomicBoolean done = new AtomicBoolean(false);
@Override public void onStart(JsonResponse response) throws IOException {
if (streaming)
response.commit(Response.Status.OK);
response.writeDocumentsArrayStart();
+ visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS);
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- acks.incrementAndGet();
- visitRenderer.execute(() -> {
+ writes.add(() -> {
response.writeDocumentValue(document);
ack.run();
- acks.decrementAndGet();
});
}
@Override public void onEnd(JsonResponse response) throws IOException {
- try {
- while (acks.get() > 0)
- Thread.sleep(1);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ done.set(true);
+ renderDocuments(writes, response, done);
response.writeArrayEnd();
}
});