diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-29 14:39:38 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-30 13:07:58 +0200 |
commit | 17756f9db3c5ec1f7e442b00ad81cfd867fc95e7 (patch) | |
tree | 7fdd2029bae8e71cc38a0a00a931a7387972b3d5 | |
parent | f2bc09658995f8bf56fde89638491d67c2bcae59 (diff) |
Simplify with separate queues for acks and docs
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 61 |
1 files changed, 28 insertions, 33 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 7ca043949a2..4dc22b0586e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -586,9 +586,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final OutputStream out = new ContentChannelOutputStream(buffer); private final JsonGenerator json; private final ResponseHandler handler; - private final Queue<DocumentBuffer> buffers = new ConcurrentLinkedQueue<>(); + private final Queue<CompletionHandler> acks = new ConcurrentLinkedQueue<>(); + private final Queue<ByteArrayOutputStream> docs = new ConcurrentLinkedQueue<>(); private final AtomicLong documentsWritten = new AtomicLong(); private final AtomicLong documentsFlushed = new AtomicLong(); + private final AtomicLong documentsAcked = new AtomicLong(); private boolean documentsDone = false; private boolean first = true; private ContentChannel channel; @@ -711,67 +713,60 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Writes documents to an internal queue, which is flushed regularly. */ void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException { - // Read queue state, and ack early if we're among the first FLUSH_SIZE to do this. - long flushed = documentsFlushed.get(); - long written = documentsWritten.incrementAndGet(); - if (completionHandler != null && written - flushed <= FLUSH_SIZE) { - completionHandler.completed(); - completionHandler = null; + if (completionHandler != null) { + acks.add(completionHandler); + ackDocuments(); } // Serialise document and add to queue, not necessarily in the order dictated by "written" above, // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early. - DocumentBuffer documentBuffer = new DocumentBuffer(completionHandler); - documentBuffer.buffer.write(','); // Prepend rather than append, to avoid double memory copying. - try (JsonGenerator myJson = jsonFactory.createGenerator(documentBuffer.buffer)) { + ByteArrayOutputStream myOut = new ByteArrayOutputStream(1); + myOut.write(','); // Prepend rather than append, to avoid double memory copying. + try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { new JsonWriter(myJson).write(document); } - buffers.add(documentBuffer); + docs.add(myOut); // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled. - if (written % FLUSH_SIZE == 0) { + if (documentsWritten.incrementAndGet() % FLUSH_SIZE == 0) { flushDocuments(); } } + void ackDocuments() { + while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) { + CompletionHandler ack = acks.poll(); + if (ack != null) + ack.completed(); + else + break; + } + documentsAcked.decrementAndGet(); // We overshoot by one above, so decrement again when done. + } + synchronized void flushDocuments() throws IOException { for (int i = 0; i < FLUSH_SIZE; i++) { - DocumentBuffer db = buffers.poll(); - if (db == null) + ByteArrayOutputStream doc = docs.poll(); + if (doc == null) break; - // Pass on any ack's that weren't done early when writing documents. if ( ! documentsDone) { if (first) { // First chunk, remove leading comma from first document, then flush "json" to "buffer". json.flush(); - buffer.write(ByteBuffer.wrap(db.buffer.toByteArray(), 1, db.buffer.size() - 1), db.completionHandler); + buffer.write(ByteBuffer.wrap(doc.toByteArray(), 1, doc.size() - 1), null); first = false; } else { - buffer.write(ByteBuffer.wrap(db.buffer.toByteArray()), db.completionHandler); + buffer.write(ByteBuffer.wrap(doc.toByteArray()), null); } } - else { // Ack any documents we're not writing. - if (db.completionHandler != null) - db.completionHandler.completed(); - } } - // Ensure the FLUSH_SIZE next documents are ack'ed no later than when these are flushed to the network. - // We cannot simply increment the flush counter here, as that would let the number of buffered messages on - // their way to the network grow unbounded. + // Ensure new, eligible acks are done, after flushing these documents. buffer.write(emptyBuffer, new CompletionHandler() { @Override public void completed() { documentsFlushed.addAndGet(FLUSH_SIZE); - int n = 0; - synchronized (JsonResponse.this) { - for (DocumentBuffer db : buffers) { - if (++n == FLUSH_SIZE) - break; - if (db.completionHandler != null) - db.completionHandler.completed(); - } - } + ackDocuments(); } @Override public void failed(Throwable t) { log.log(WARNING, "Error writing documents", t); |