diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-13 14:01:33 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-13 14:01:33 +0200 |
commit | 2d1f722b835e2f844058bfffbca66f258129bff1 (patch) | |
tree | 2ea9b4da665166702d9111f2e9427a6b2c031c09 /vespaclient-container-plugin | |
parent | 89098c7afddc62256d3b969c1fdc452e95360038 (diff) |
Also do dispatch when enqueueing
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 59cf6db43ef..445f2d68961 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -280,7 +280,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) { - enqueue(request, handler, () -> { + enqueueAndDispatch(request, handler, () -> { VisitorOptions options = parseOptions(request, path).build(); return () -> { visit(request, options, handler); @@ -291,7 +291,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) { - enqueue(request, handler, () -> { + enqueueAndDispatch(request, handler, () -> { VisitorOptions.Builder optionsBuilder = parseOptions(request, path); optionsBuilder = optionsBuilder.documentType(path.documentType()); optionsBuilder = optionsBuilder.namespace(path.namespace()); @@ -306,7 +306,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) { - enqueue(request, handler, () -> { + enqueueAndDispatch(request, handler, () -> { DocumentOperationParameters rawParameters = parameters(); rawParameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).route()) .map(rawParameters::withRoute) @@ -331,7 +331,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant()); return new ForwardingContentChannel(in -> { - enqueue(request, handler, () -> { + enqueueAndDispatch(request, handler, () -> { DocumentPut put = parser.parsePut(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) @@ -345,7 +345,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant()); return new ForwardingContentChannel(in -> { - enqueue(request, handler, () -> { + enqueueAndDispatch(request, handler, () -> { DocumentUpdate update = parser.parseUpdate(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent); @@ -359,7 +359,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant()); - enqueue(request, handler, () -> { + enqueueAndDispatch(request, handler, () -> { DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) .orElse(parameters()); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); @@ -371,7 +371,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Dispatches enqueued requests until one is blocked. */ private void dispatchEnqueued() { try { - while (dispatch()); + while (dispatchFirst()); } catch (Exception e) { log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); @@ -379,7 +379,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */ - private boolean dispatch() { + private boolean dispatchFirst() { Operation operation = operations.poll(); if (operation == null) return false; @@ -392,14 +392,23 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } - /** Enqueues the given request and operation, or responds with "overload" if the queue is full. */ - private void enqueue(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) { + /** + * Enqueues the given request and operation, or responds with "overload" if the queue is full, + * and then attempts to dispatch an enqueued operation from the head of the queue. + */ + private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) { if (enqueued.incrementAndGet() > maxThrottled) { enqueued.decrementAndGet(); overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler); return; } - operations.offer(Operation.lazilyParsed(request, handler, operationParser)); + Operation operation = Operation.lazilyParsed(request, handler, operationParser); + if (enqueued.get() == 0 && operation.dispatch()) // Bypass queue if it is empty. + enqueued.decrementAndGet(); + else { + operations.offer(operation); + dispatchFirst(); + } } @FunctionalInterface |