aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-29 14:09:45 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-30 13:07:58 +0200
commitf2bc09658995f8bf56fde89638491d67c2bcae59 (patch)
tree11e4f6b80556ba7057651b2f2f82e36a666aa532 /vespaclient-container-plugin
parent18a747185627747b0d137d8d4f06da3024587114 (diff)
Write visited documents in chunks to synchronization point
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java114
1 files changed, 98 insertions, 16 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 20121d8d193..7ca043949a2 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -68,6 +68,7 @@ import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.yolean.Exceptions;
import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -83,9 +84,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
@@ -577,14 +580,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static class JsonResponse implements AutoCloseable {
private static final ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+ private static final int FLUSH_SIZE = 128;
private final BufferedContentChannel buffer = new BufferedContentChannel();
private final OutputStream out = new ContentChannelOutputStream(buffer);
private final JsonGenerator json;
private final ResponseHandler handler;
+ private final Queue<DocumentBuffer> buffers = new ConcurrentLinkedQueue<>();
+ private final AtomicLong documentsWritten = new AtomicLong();
+ private final AtomicLong documentsFlushed = new AtomicLong();
private boolean documentsDone = false;
+ private boolean first = true;
private ContentChannel channel;
+ private static class DocumentBuffer {
+ private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ private final CompletionHandler completionHandler;
+ private DocumentBuffer(CompletionHandler completionHandler) { this.completionHandler = completionHandler; }
+ }
+
private JsonResponse(ResponseHandler handler) throws IOException {
this.handler = handler;
json = jsonFactory.createGenerator(out);
@@ -695,19 +709,79 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
- synchronized void writeDocumentValue(Document document, CompletionHandler completionHandler) {
- if ( ! documentsDone)
- new JsonWriter(json).write(document);
+ /** Writes documents to an internal queue, which is flushed regularly. */
+ void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException {
+ // Read queue state, and ack early if we're among the first FLUSH_SIZE to do this.
+ long flushed = documentsFlushed.get();
+ long written = documentsWritten.incrementAndGet();
+ if (completionHandler != null && written - flushed <= FLUSH_SIZE) {
+ completionHandler.completed();
+ completionHandler = null;
+ }
- if (completionHandler != null) {
- if ( ! documentsDone)
- buffer.write(emptyBuffer, completionHandler);
- else
- completionHandler.completed();
+ // Serialise document and add to queue, not necessarily in the order dictated by "written" above,
+ // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early.
+ DocumentBuffer documentBuffer = new DocumentBuffer(completionHandler);
+ documentBuffer.buffer.write(','); // Prepend rather than append, to avoid double memory copying.
+ try (JsonGenerator myJson = jsonFactory.createGenerator(documentBuffer.buffer)) {
+ new JsonWriter(myJson).write(document);
+ }
+ buffers.add(documentBuffer);
+
+ // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled.
+ if (written % FLUSH_SIZE == 0) {
+ flushDocuments();
+ }
+ }
+
+ synchronized void flushDocuments() throws IOException {
+ for (int i = 0; i < FLUSH_SIZE; i++) {
+ DocumentBuffer db = buffers.poll();
+ if (db == null)
+ break;
+
+ // Pass on any ack's that weren't done early when writing documents.
+ if ( ! documentsDone) {
+ if (first) { // First chunk, remove leading comma from first document, then flush "json" to "buffer".
+ json.flush();
+ buffer.write(ByteBuffer.wrap(db.buffer.toByteArray(), 1, db.buffer.size() - 1), db.completionHandler);
+ first = false;
+ }
+ else {
+ buffer.write(ByteBuffer.wrap(db.buffer.toByteArray()), db.completionHandler);
+ }
+ }
+ else { // Ack any documents we're not writing.
+ if (db.completionHandler != null)
+ db.completionHandler.completed();
+ }
}
+
+ // Ensure the FLUSH_SIZE next documents are ack'ed no later than when these are flushed to the network.
+ // We cannot simply increment the flush counter here, as that would let the number of buffered messages on
+ // their way to the network grow unbounded.
+ buffer.write(emptyBuffer, new CompletionHandler() {
+ @Override public void completed() {
+ documentsFlushed.addAndGet(FLUSH_SIZE);
+ int n = 0;
+ synchronized (JsonResponse.this) {
+ for (DocumentBuffer db : buffers) {
+ if (++n == FLUSH_SIZE)
+ break;
+ if (db.completionHandler != null)
+ db.completionHandler.completed();
+ }
+ }
+ }
+ @Override public void failed(Throwable t) {
+ log.log(WARNING, "Error writing documents", t);
+ completed();
+ }
+ });
}
synchronized void writeArrayEnd() throws IOException {
+ flushDocuments();
documentsDone = true;
json.writeEndArray();
}
@@ -1124,14 +1198,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- if (streamed)
- response.writeDocumentValue(document, new CompletionHandler() {
- @Override public void completed() { ack.run();}
- @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); }
- });
- else {
- response.writeDocumentValue(document, null);
- ack.run();
+ try {
+ if (streamed)
+ response.writeDocumentValue(document, new CompletionHandler() {
+ @Override public void completed() { ack.run();}
+ @Override public void failed(Throwable t) {
+ ack.run();
+ onError.accept(t.getMessage());
+ }
+ });
+ else {
+ response.writeDocumentValue(document, null);
+ ack.run();
+ }
+ }
+ catch (Exception e) {
+ onError.accept(e.getMessage());
}
}
@Override public void onEnd(JsonResponse response) throws IOException {