aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-29 14:39:38 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-30 13:07:58 +0200
commit17756f9db3c5ec1f7e442b00ad81cfd867fc95e7 (patch)
tree7fdd2029bae8e71cc38a0a00a931a7387972b3d5 /vespaclient-container-plugin
parentf2bc09658995f8bf56fde89638491d67c2bcae59 (diff)
Simplify with separate queues for acks and docs
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java61
1 files changed, 28 insertions, 33 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 7ca043949a2..4dc22b0586e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -586,9 +586,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final OutputStream out = new ContentChannelOutputStream(buffer);
private final JsonGenerator json;
private final ResponseHandler handler;
- private final Queue<DocumentBuffer> buffers = new ConcurrentLinkedQueue<>();
+ private final Queue<CompletionHandler> acks = new ConcurrentLinkedQueue<>();
+ private final Queue<ByteArrayOutputStream> docs = new ConcurrentLinkedQueue<>();
private final AtomicLong documentsWritten = new AtomicLong();
private final AtomicLong documentsFlushed = new AtomicLong();
+ private final AtomicLong documentsAcked = new AtomicLong();
private boolean documentsDone = false;
private boolean first = true;
private ContentChannel channel;
@@ -711,67 +713,60 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Writes documents to an internal queue, which is flushed regularly. */
void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException {
- // Read queue state, and ack early if we're among the first FLUSH_SIZE to do this.
- long flushed = documentsFlushed.get();
- long written = documentsWritten.incrementAndGet();
- if (completionHandler != null && written - flushed <= FLUSH_SIZE) {
- completionHandler.completed();
- completionHandler = null;
+ if (completionHandler != null) {
+ acks.add(completionHandler);
+ ackDocuments();
}
// Serialise document and add to queue, not necessarily in the order dictated by "written" above,
// i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early.
- DocumentBuffer documentBuffer = new DocumentBuffer(completionHandler);
- documentBuffer.buffer.write(','); // Prepend rather than append, to avoid double memory copying.
- try (JsonGenerator myJson = jsonFactory.createGenerator(documentBuffer.buffer)) {
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream(1);
+ myOut.write(','); // Prepend rather than append, to avoid double memory copying.
+ try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
new JsonWriter(myJson).write(document);
}
- buffers.add(documentBuffer);
+ docs.add(myOut);
// Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled.
- if (written % FLUSH_SIZE == 0) {
+ if (documentsWritten.incrementAndGet() % FLUSH_SIZE == 0) {
flushDocuments();
}
}
+ void ackDocuments() {
+ while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) {
+ CompletionHandler ack = acks.poll();
+ if (ack != null)
+ ack.completed();
+ else
+ break;
+ }
+ documentsAcked.decrementAndGet(); // We overshoot by one above, so decrement again when done.
+ }
+
synchronized void flushDocuments() throws IOException {
for (int i = 0; i < FLUSH_SIZE; i++) {
- DocumentBuffer db = buffers.poll();
- if (db == null)
+ ByteArrayOutputStream doc = docs.poll();
+ if (doc == null)
break;
- // Pass on any ack's that weren't done early when writing documents.
if ( ! documentsDone) {
if (first) { // First chunk, remove leading comma from first document, then flush "json" to "buffer".
json.flush();
- buffer.write(ByteBuffer.wrap(db.buffer.toByteArray(), 1, db.buffer.size() - 1), db.completionHandler);
+ buffer.write(ByteBuffer.wrap(doc.toByteArray(), 1, doc.size() - 1), null);
first = false;
}
else {
- buffer.write(ByteBuffer.wrap(db.buffer.toByteArray()), db.completionHandler);
+ buffer.write(ByteBuffer.wrap(doc.toByteArray()), null);
}
}
- else { // Ack any documents we're not writing.
- if (db.completionHandler != null)
- db.completionHandler.completed();
- }
}
- // Ensure the FLUSH_SIZE next documents are ack'ed no later than when these are flushed to the network.
- // We cannot simply increment the flush counter here, as that would let the number of buffered messages on
- // their way to the network grow unbounded.
+ // Ensure new, eligible acks are done, after flushing these documents.
buffer.write(emptyBuffer, new CompletionHandler() {
@Override public void completed() {
documentsFlushed.addAndGet(FLUSH_SIZE);
- int n = 0;
- synchronized (JsonResponse.this) {
- for (DocumentBuffer db : buffers) {
- if (++n == FLUSH_SIZE)
- break;
- if (db.completionHandler != null)
- db.completionHandler.completed();
- }
- }
+ ackDocuments();
}
@Override public void failed(Throwable t) {
log.log(WARNING, "Error writing documents", t);