diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-21 08:13:16 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-21 08:13:16 +0200 |
commit | 2f39f64ddba3dfe6f0b9dd92d6284f4770003053 (patch) | |
tree | 5cdaf2075d70b84060054f6dcb13eba0334ce62b /vespaclient-container-plugin | |
parent | cf9657500b1e59ea63db9189ae31b8f485455560 (diff) |
Use a queue of pending writes for each visitor
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 833131d581d..711b6388b7a 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -91,6 +91,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -1098,31 +1099,34 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } + private void renderDocuments(Deque<Runnable> writes, JsonResponse response, AtomicBoolean done) { + synchronized (response) { + for (Runnable write; (write = writes.poll()) != null; write.run()) ; + if ( ! done.get()) + visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS); + } + } + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { visit(request, parameters, streaming, handler, new VisitCallback() { - AtomicLong acks = new AtomicLong(); + final Deque<Runnable> writes = new ConcurrentLinkedDeque<>(); + final AtomicBoolean done = new AtomicBoolean(false); @Override public void onStart(JsonResponse response) throws IOException { if (streaming) response.commit(Response.Status.OK); response.writeDocumentsArrayStart(); + visitRenderer.schedule(() -> renderDocuments(writes, response, done), 10, TimeUnit.MILLISECONDS); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - acks.incrementAndGet(); - visitRenderer.execute(() -> { + writes.add(() -> { response.writeDocumentValue(document); ack.run(); - acks.decrementAndGet(); }); } @Override public void onEnd(JsonResponse response) throws IOException { - try { - while (acks.get() > 0) - Thread.sleep(1); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + done.set(true); + renderDocuments(writes, response, done); response.writeArrayEnd(); } }); |