diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-29 14:09:45 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-30 13:07:58 +0200 |
commit | f2bc09658995f8bf56fde89638491d67c2bcae59 (patch) | |
tree | 11e4f6b80556ba7057651b2f2f82e36a666aa532 /vespaclient-container-plugin/src/main/java/com/yahoo/document | |
parent | 18a747185627747b0d137d8d4f06da3024587114 (diff) |
Write visited documents in chunks to synchronization point
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 114 |
1 files changed, 98 insertions, 16 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 20121d8d193..7ca043949a2 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -68,6 +68,7 @@ import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.yolean.Exceptions; import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -83,9 +84,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; @@ -577,14 +580,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static class JsonResponse implements AutoCloseable { private static final ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]); + private static final int FLUSH_SIZE = 128; private final BufferedContentChannel buffer = new BufferedContentChannel(); private final OutputStream out = new ContentChannelOutputStream(buffer); private final JsonGenerator json; private final ResponseHandler handler; + private final Queue<DocumentBuffer> buffers = new ConcurrentLinkedQueue<>(); + private final AtomicLong documentsWritten = new AtomicLong(); + private final AtomicLong documentsFlushed = new AtomicLong(); private boolean documentsDone = false; + private boolean first = true; private ContentChannel channel; + private static class DocumentBuffer { + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + private final CompletionHandler completionHandler; + private DocumentBuffer(CompletionHandler completionHandler) { this.completionHandler = completionHandler; } + } + private JsonResponse(ResponseHandler handler) throws IOException { this.handler = handler; json = jsonFactory.createGenerator(out); @@ -695,19 +709,79 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } - synchronized void writeDocumentValue(Document document, CompletionHandler completionHandler) { - if ( ! documentsDone) - new JsonWriter(json).write(document); + /** Writes documents to an internal queue, which is flushed regularly. */ + void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException { + // Read queue state, and ack early if we're among the first FLUSH_SIZE to do this. + long flushed = documentsFlushed.get(); + long written = documentsWritten.incrementAndGet(); + if (completionHandler != null && written - flushed <= FLUSH_SIZE) { + completionHandler.completed(); + completionHandler = null; + } - if (completionHandler != null) { - if ( ! documentsDone) - buffer.write(emptyBuffer, completionHandler); - else - completionHandler.completed(); + // Serialise document and add to queue, not necessarily in the order dictated by "written" above, + // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early. + DocumentBuffer documentBuffer = new DocumentBuffer(completionHandler); + documentBuffer.buffer.write(','); // Prepend rather than append, to avoid double memory copying. + try (JsonGenerator myJson = jsonFactory.createGenerator(documentBuffer.buffer)) { + new JsonWriter(myJson).write(document); + } + buffers.add(documentBuffer); + + // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled. + if (written % FLUSH_SIZE == 0) { + flushDocuments(); + } + } + + synchronized void flushDocuments() throws IOException { + for (int i = 0; i < FLUSH_SIZE; i++) { + DocumentBuffer db = buffers.poll(); + if (db == null) + break; + + // Pass on any ack's that weren't done early when writing documents. + if ( ! documentsDone) { + if (first) { // First chunk, remove leading comma from first document, then flush "json" to "buffer". + json.flush(); + buffer.write(ByteBuffer.wrap(db.buffer.toByteArray(), 1, db.buffer.size() - 1), db.completionHandler); + first = false; + } + else { + buffer.write(ByteBuffer.wrap(db.buffer.toByteArray()), db.completionHandler); + } + } + else { // Ack any documents we're not writing. + if (db.completionHandler != null) + db.completionHandler.completed(); + } } + + // Ensure the FLUSH_SIZE next documents are ack'ed no later than when these are flushed to the network. + // We cannot simply increment the flush counter here, as that would let the number of buffered messages on + // their way to the network grow unbounded. + buffer.write(emptyBuffer, new CompletionHandler() { + @Override public void completed() { + documentsFlushed.addAndGet(FLUSH_SIZE); + int n = 0; + synchronized (JsonResponse.this) { + for (DocumentBuffer db : buffers) { + if (++n == FLUSH_SIZE) + break; + if (db.completionHandler != null) + db.completionHandler.completed(); + } + } + } + @Override public void failed(Throwable t) { + log.log(WARNING, "Error writing documents", t); + completed(); + } + }); } synchronized void writeArrayEnd() throws IOException { + flushDocuments(); documentsDone = true; json.writeEndArray(); } @@ -1124,14 +1198,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - if (streamed) - response.writeDocumentValue(document, new CompletionHandler() { - @Override public void completed() { ack.run();} - @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } - }); - else { - response.writeDocumentValue(document, null); - ack.run(); + try { + if (streamed) + response.writeDocumentValue(document, new CompletionHandler() { + @Override public void completed() { ack.run();} + @Override public void failed(Throwable t) { + ack.run(); + onError.accept(t.getMessage()); + } + }); + else { + response.writeDocumentValue(document, null); + ack.run(); + } + } + catch (Exception e) { + onError.accept(e.getMessage()); } } @Override public void onEnd(JsonResponse response) throws IOException { |