summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-10-13 14:01:33 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-10-13 14:01:33 +0200
commit2d1f722b835e2f844058bfffbca66f258129bff1 (patch)
tree2ea9b4da665166702d9111f2e9427a6b2c031c09 /vespaclient-container-plugin
parent89098c7afddc62256d3b969c1fdc452e95360038 (diff)
Also do dispatch when enqueueing
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java31
1 files changed, 20 insertions, 11 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 59cf6db43ef..445f2d68961 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -280,7 +280,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- enqueue(request, handler, () -> {
+ enqueueAndDispatch(request, handler, () -> {
VisitorOptions options = parseOptions(request, path).build();
return () -> {
visit(request, options, handler);
@@ -291,7 +291,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- enqueue(request, handler, () -> {
+ enqueueAndDispatch(request, handler, () -> {
VisitorOptions.Builder optionsBuilder = parseOptions(request, path);
optionsBuilder = optionsBuilder.documentType(path.documentType());
optionsBuilder = optionsBuilder.namespace(path.namespace());
@@ -306,7 +306,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- enqueue(request, handler, () -> {
+ enqueueAndDispatch(request, handler, () -> {
DocumentOperationParameters rawParameters = parameters();
rawParameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).route())
.map(rawParameters::withRoute)
@@ -331,7 +331,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant());
return new ForwardingContentChannel(in -> {
- enqueue(request, handler, () -> {
+ enqueueAndDispatch(request, handler, () -> {
DocumentPut put = parser.parsePut(in, path.id().toString());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
@@ -345,7 +345,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant());
return new ForwardingContentChannel(in -> {
- enqueue(request, handler, () -> {
+ enqueueAndDispatch(request, handler, () -> {
DocumentUpdate update = parser.parseUpdate(in, path.id().toString());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
@@ -359,7 +359,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant());
- enqueue(request, handler, () -> {
+ enqueueAndDispatch(request, handler, () -> {
DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
.orElse(parameters());
DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
@@ -371,7 +371,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Dispatches enqueued requests until one is blocked. */
private void dispatchEnqueued() {
try {
- while (dispatch());
+ while (dispatchFirst());
}
catch (Exception e) {
log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
@@ -379,7 +379,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
/** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */
- private boolean dispatch() {
+ private boolean dispatchFirst() {
Operation operation = operations.poll();
if (operation == null)
return false;
@@ -392,14 +392,23 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
- /** Enqueues the given request and operation, or responds with "overload" if the queue is full. */
- private void enqueue(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) {
+ /**
+ * Enqueues the given request and operation, or responds with "overload" if the queue is full,
+ * and then attempts to dispatch an enqueued operation from the head of the queue.
+ */
+ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) {
if (enqueued.incrementAndGet() > maxThrottled) {
enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler);
return;
}
- operations.offer(Operation.lazilyParsed(request, handler, operationParser));
+ Operation operation = Operation.lazilyParsed(request, handler, operationParser);
+ if (enqueued.get() == 0 && operation.dispatch()) // Bypass queue if it is empty.
+ enqueued.decrementAndGet();
+ else {
+ operations.offer(operation);
+ dispatchFirst();
+ }
}
@FunctionalInterface