diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2024-04-04 15:38:46 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2024-04-04 15:38:46 +0200 |
commit | e34fecf62cf353fa5b0ee86fbdf900b10e103e1b (patch) | |
tree | d0727a8a735f1846278f3737ffbc7e28760865c2 /vespaclient-container-plugin | |
parent | 95961f1823499edd9f07930332380a2b9c4b1e13 (diff) |
- Avoid a Q in the document v1 handler. Rely only on mbus Q.
- This avoid filling a potentially large Q, and allows for more direct and better feedback to client.
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 15 insertions, 178 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index b483d6977d6..594c5c8f398 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -187,17 +187,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Metric metric; private final DocumentApiMetrics metrics; private final DocumentOperationParser parser; - private final long maxThrottled; - private final long maxThrottledAgeNS; private final DocumentAccess access; private final AsyncSession asyncSession; private final Map<String, StorageCluster> clusters; - private final Deque<Operation> operations; private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>(); - private final AtomicLong enqueued = new AtomicLong(); private final AtomicLong outstanding = new AtomicLong(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); - private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); private final Map<String, Map<Method, Handler>> handlers = defineApi(); @@ -221,16 +216,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.parser = new DocumentOperationParser(documentmanagerConfig); this.metric = metric; this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); - this.maxThrottled = executorConfig.maxThrottled(); - this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0); this.access = access; this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); - this.operations = new ConcurrentLinkedDeque<>(); long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis(); // TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty. - this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); } @@ -288,27 +279,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { visits.values().forEach(VisitorSession::abort); visits.values().forEach(VisitorSession::destroy); - // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. - dispatcher.shutdown(); + // Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty. visitDispatcher.shutdown(); - while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { - dispatchEnqueued(); + while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { dispatchVisitEnqueued(); } - if ( ! operations.isEmpty()) - log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left"); - if ( ! visitOperations.isEmpty()) - log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left"); + log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left"); try { while (outstanding.get() > 0 && clock.instant().isBefore(doom)) Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); - if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) - dispatcher.shutdownNow(); - if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) visitDispatcher.shutdownNow(); } @@ -552,30 +535,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } /** Dispatches enqueued requests until one is blocked. */ - void dispatchEnqueued() { - try { - while (dispatchFirst()); - } - catch (Exception e) { - log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); - } - } - - /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */ - private boolean dispatchFirst() { - Operation operation = operations.poll(); - if (operation == null) - return false; - - if (operation.dispatch()) { - enqueued.decrementAndGet(); - return true; - } - operations.push(operation); - return false; - } - - /** Dispatches enqueued requests until one is blocked. */ void dispatchVisitEnqueued() { try { while (dispatchFirstVisit()); @@ -598,36 +557,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } - private long qAgeNS(HttpRequest request) { - Operation oldest = operations.peek(); - return (oldest != null) - ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime()) - : 0; - } - /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. */ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) { - long numQueued = enqueued.incrementAndGet(); - if (numQueued > maxThrottled) { - enqueued.decrementAndGet(); + Operation operation = new Operation(request, handler, operationParser); + if ( ! operation.dispatch()) { overload(request, "Rejecting execution due to overload: " - + maxThrottled + " requests already enqueued", handler); - return; - } - if (numQueued > 1) { - long ageNS = qAgeNS(request); - if (ageNS > maxThrottledAgeNS) { - enqueued.decrementAndGet(); - overload(request, "Rejecting execution due to overload: " - + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler); - return; - } + + (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler); } - operations.offer(new Operation(request, handler, operationParser)); - dispatchFirst(); } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 58cf34712aa..2d0b2de100e 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -71,13 +71,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -190,55 +184,31 @@ public class DocumentV1ApiTest { } @Test - public void testOverLoadBySize() { + public void testOverLoad() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); // OVERLOAD is a 429 access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); - var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" + - "}", response3.readAll()); - assertEquals(429, response3.getStatus()); - - access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); - handler.dispatchEnqueued(); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + + " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" + "}", response1.readAll()); - assertEquals(500, response1.getStatus()); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + - "}", response2.readAll()); - assertEquals(500, response2.getStatus()); - driver.close(); - } + assertEquals(429, response1.getStatus()); - @Test - public void testOverLoadByAge() { - RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); - // OVERLOAD is a 429 - access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); - var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); - try { Thread.sleep(3_000); } catch (InterruptedException e) {} - var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" + + " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" + "}", response2.readAll()); - assertEquals(429, response2.getStatus()); + assertEquals(429, response1.getStatus()); access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); - handler.dispatchEnqueued(); + var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + - "}", response1.readAll()); - assertEquals(500, response1.getStatus()); + "}", response3.readAll()); + assertEquals(500, response3.getStatus()); driver.close(); } @@ -1036,78 +1006,6 @@ public class DocumentV1ApiTest { }); } - @Test - public void testThroughput() throws InterruptedException { - DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); - handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, - executorConfig, clusterConfig, bucketConfig); - - int writers = 4; - int queueFill = executorConfig.maxThrottled() - writers; - RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); - ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers); - ScheduledExecutorService reader = Executors.newScheduledThreadPool(1); - ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers); - BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>(); - - Response success = new Response(0, null, Response.Outcome.SUCCESS); - int docs = 1 << 14; - assertTrue(docs >= writers); - AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>(); - - CountDownLatch latch = new CountDownLatch(docs); - reader.execute(() -> { - while ( ! reader.isShutdown()) { - try { - var response = responses.take(); - response.awaitResponse().readAll(); - if (response.getStatus() != 200) - failed.set(response.getResponse()); - latch.countDown(); - } - catch (InterruptedException e) { break; } - } - }); - - // Fill the handler resend queue. - long startNanos = System.nanoTime(); - CountDownLatch setup = new CountDownLatch(queueFill); - access.session.expect((id, parameters) -> { - setup.countDown(); - return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)); - }); - for (int i = 0; i < queueFill; i++) { - int j = i; - writer.execute(() -> { - responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j, - POST, - "{ \"fields\": { \"artist\": \"Sigrid\" } }")); - }); - } - setup.await(); - - // Let "messagebus" start accepting messages. - access.session.expect((id, parameters) -> { - replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS); - return new Result(0); - }); - // Send the rest of the documents. Rely on resender to empty queue of throttled operations. - for (int i = queueFill; i < docs; i++) { - int j = i; - writer.execute(() -> { - responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j, - POST, - "{ \"fields\": { \"artist\": \"Sigrid\" } }")); - }); - } - latch.await(); - System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); - - assertNull(failed.get()); - driver.close(); - } - - static class MockDocumentAccess extends DocumentAccess { private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>(); @@ -1223,7 +1121,7 @@ public class DocumentV1ApiTest { @Override public double getCurrentWindowSize() { - throw new AssertionError("Not used"); + return 20; } public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) { |