aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java73
1 files changed, 67 insertions, 6 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 594c5c8f398..b483d6977d6 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -187,12 +187,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Metric metric;
private final DocumentApiMetrics metrics;
private final DocumentOperationParser parser;
+ private final long maxThrottled;
+ private final long maxThrottledAgeNS;
private final DocumentAccess access;
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
+ private final Deque<Operation> operations;
private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>();
+ private final AtomicLong enqueued = new AtomicLong();
private final AtomicLong outstanding = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@@ -216,12 +221,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.parser = new DocumentOperationParser(documentmanagerConfig);
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.maxThrottled = executorConfig.maxThrottled();
+ this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0);
this.access = access;
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
+ this.operations = new ConcurrentLinkedDeque<>();
long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis();
// TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty.
+ this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
}
@@ -279,19 +288,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
visits.values().forEach(VisitorSession::abort);
visits.values().forEach(VisitorSession::destroy);
- // Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty.
+ // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
+ dispatcher.shutdown();
visitDispatcher.shutdown();
- while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
+ while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
+ dispatchEnqueued();
dispatchVisitEnqueued();
}
+ if ( ! operations.isEmpty())
+ log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left");
+
if ( ! visitOperations.isEmpty())
- log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left");
+ log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left");
try {
while (outstanding.get() > 0 && clock.instant().isBefore(doom))
Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
+ if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
+ dispatcher.shutdownNow();
+
if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
visitDispatcher.shutdownNow();
}
@@ -535,6 +552,30 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
/** Dispatches enqueued requests until one is blocked. */
+ void dispatchEnqueued() {
+ try {
+ while (dispatchFirst());
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
+ }
+ }
+
+ /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */
+ private boolean dispatchFirst() {
+ Operation operation = operations.poll();
+ if (operation == null)
+ return false;
+
+ if (operation.dispatch()) {
+ enqueued.decrementAndGet();
+ return true;
+ }
+ operations.push(operation);
+ return false;
+ }
+
+ /** Dispatches enqueued requests until one is blocked. */
void dispatchVisitEnqueued() {
try {
while (dispatchFirstVisit());
@@ -557,16 +598,36 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
+ private long qAgeNS(HttpRequest request) {
+ Operation oldest = operations.peek();
+ return (oldest != null)
+ ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime())
+ : 0;
+ }
+
/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) {
- Operation operation = new Operation(request, handler, operationParser);
- if ( ! operation.dispatch()) {
+ long numQueued = enqueued.incrementAndGet();
+ if (numQueued > maxThrottled) {
+ enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: "
- + (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler);
+ + maxThrottled + " requests already enqueued", handler);
+ return;
+ }
+ if (numQueued > 1) {
+ long ageNS = qAgeNS(request);
+ if (ageNS > maxThrottledAgeNS) {
+ enqueued.decrementAndGet();
+ overload(request, "Rejecting execution due to overload: "
+ + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler);
+ return;
+ }
}
+ operations.offer(new Operation(request, handler, operationParser));
+ dispatchFirst();
}