From 3be23c8101467995923c0a40dc6aa7e510d4b0a1 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 16 Apr 2024 11:58:58 +0200 Subject: Revert "- Avoid a Q in the document v1 handler. Rely only on mbus Q." --- .../restapi/resource/DocumentV1ApiHandler.java | 73 ++++++++++++++++++++-- 1 file changed, 67 insertions(+), 6 deletions(-) (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java') diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 594c5c8f398..b483d6977d6 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -187,12 +187,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Metric metric; private final DocumentApiMetrics metrics; private final DocumentOperationParser parser; + private final long maxThrottled; + private final long maxThrottledAgeNS; private final DocumentAccess access; private final AsyncSession asyncSession; private final Map clusters; + private final Deque operations; private final Deque visitOperations = new ConcurrentLinkedDeque<>(); + private final AtomicLong enqueued = new AtomicLong(); private final AtomicLong outstanding = new AtomicLong(); private final Map visits = new ConcurrentHashMap<>(); + private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); private final Map> handlers = defineApi(); @@ -216,12 +221,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.parser = new DocumentOperationParser(documentmanagerConfig); this.metric = metric; this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); + this.maxThrottled = executorConfig.maxThrottled(); + this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0); this.access = access; this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); + this.operations = new ConcurrentLinkedDeque<>(); long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis(); // TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty. + this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); } @@ -279,19 +288,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { visits.values().forEach(VisitorSession::abort); visits.values().forEach(VisitorSession::destroy); - // Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty. + // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. + dispatcher.shutdown(); visitDispatcher.shutdown(); - while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { + while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { + dispatchEnqueued(); dispatchVisitEnqueued(); } + if ( ! operations.isEmpty()) + log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left"); + if ( ! visitOperations.isEmpty()) - log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left"); + log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left"); try { while (outstanding.get() > 0 && clock.instant().isBefore(doom)) Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) + dispatcher.shutdownNow(); + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) visitDispatcher.shutdownNow(); } @@ -534,6 +551,30 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return parameters; } + /** Dispatches enqueued requests until one is blocked. */ + void dispatchEnqueued() { + try { + while (dispatchFirst()); + } + catch (Exception e) { + log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); + } + } + + /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */ + private boolean dispatchFirst() { + Operation operation = operations.poll(); + if (operation == null) + return false; + + if (operation.dispatch()) { + enqueued.decrementAndGet(); + return true; + } + operations.push(operation); + return false; + } + /** Dispatches enqueued requests until one is blocked. */ void dispatchVisitEnqueued() { try { @@ -557,16 +598,36 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } + private long qAgeNS(HttpRequest request) { + Operation oldest = operations.peek(); + return (oldest != null) + ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime()) + : 0; + } + /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. */ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier operationParser) { - Operation operation = new Operation(request, handler, operationParser); - if ( ! operation.dispatch()) { + long numQueued = enqueued.incrementAndGet(); + if (numQueued > maxThrottled) { + enqueued.decrementAndGet(); overload(request, "Rejecting execution due to overload: " - + (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler); + + maxThrottled + " requests already enqueued", handler); + return; + } + if (numQueued > 1) { + long ageNS = qAgeNS(request); + if (ageNS > maxThrottledAgeNS) { + enqueued.decrementAndGet(); + overload(request, "Rejecting execution due to overload: " + + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler); + return; + } } + operations.offer(new Operation(request, handler, operationParser)); + dispatchFirst(); } -- cgit v1.2.3