summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2024-04-04 15:38:46 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2024-04-04 15:38:46 +0200
commite34fecf62cf353fa5b0ee86fbdf900b10e103e1b (patch)
treed0727a8a735f1846278f3737ffbc7e28760865c2 /vespaclient-container-plugin
parent95961f1823499edd9f07930332380a2b9c4b1e13 (diff)
- Avoid a Q in the document v1 handler. Rely only on mbus Q.
- This avoid filling a potentially large Q, and allows for more direct and better feedback to client.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java73
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java120
2 files changed, 15 insertions, 178 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index b483d6977d6..594c5c8f398 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -187,17 +187,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Metric metric;
private final DocumentApiMetrics metrics;
private final DocumentOperationParser parser;
- private final long maxThrottled;
- private final long maxThrottledAgeNS;
private final DocumentAccess access;
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
- private final Deque<Operation> operations;
private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>();
- private final AtomicLong enqueued = new AtomicLong();
private final AtomicLong outstanding = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
- private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@@ -221,16 +216,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.parser = new DocumentOperationParser(documentmanagerConfig);
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
- this.maxThrottled = executorConfig.maxThrottled();
- this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0);
this.access = access;
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
- this.operations = new ConcurrentLinkedDeque<>();
long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis();
// TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty.
- this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
}
@@ -288,27 +279,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
visits.values().forEach(VisitorSession::abort);
visits.values().forEach(VisitorSession::destroy);
- // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
- dispatcher.shutdown();
+ // Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty.
visitDispatcher.shutdown();
- while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
- dispatchEnqueued();
+ while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
dispatchVisitEnqueued();
}
- if ( ! operations.isEmpty())
- log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left");
-
if ( ! visitOperations.isEmpty())
- log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left");
+ log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left");
try {
while (outstanding.get() > 0 && clock.instant().isBefore(doom))
Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
- if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
- dispatcher.shutdownNow();
-
if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
visitDispatcher.shutdownNow();
}
@@ -552,30 +535,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
/** Dispatches enqueued requests until one is blocked. */
- void dispatchEnqueued() {
- try {
- while (dispatchFirst());
- }
- catch (Exception e) {
- log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
- }
- }
-
- /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */
- private boolean dispatchFirst() {
- Operation operation = operations.poll();
- if (operation == null)
- return false;
-
- if (operation.dispatch()) {
- enqueued.decrementAndGet();
- return true;
- }
- operations.push(operation);
- return false;
- }
-
- /** Dispatches enqueued requests until one is blocked. */
void dispatchVisitEnqueued() {
try {
while (dispatchFirstVisit());
@@ -598,36 +557,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
- private long qAgeNS(HttpRequest request) {
- Operation oldest = operations.peek();
- return (oldest != null)
- ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime())
- : 0;
- }
-
/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) {
- long numQueued = enqueued.incrementAndGet();
- if (numQueued > maxThrottled) {
- enqueued.decrementAndGet();
+ Operation operation = new Operation(request, handler, operationParser);
+ if ( ! operation.dispatch()) {
overload(request, "Rejecting execution due to overload: "
- + maxThrottled + " requests already enqueued", handler);
- return;
- }
- if (numQueued > 1) {
- long ageNS = qAgeNS(request);
- if (ageNS > maxThrottledAgeNS) {
- enqueued.decrementAndGet();
- overload(request, "Rejecting execution due to overload: "
- + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler);
- return;
- }
+ + (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler);
}
- operations.offer(new Operation(request, handler, operationParser));
- dispatchFirst();
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 58cf34712aa..2d0b2de100e 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -71,13 +71,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -190,55 +184,31 @@ public class DocumentV1ApiTest {
}
@Test
- public void testOverLoadBySize() {
+ public void testOverLoad() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
// OVERLOAD is a 429
access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
- var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
- "}", response3.readAll());
- assertEquals(429, response3.getStatus());
-
- access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
- handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
+ " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
"}", response1.readAll());
- assertEquals(500, response1.getStatus());
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
- "}", response2.readAll());
- assertEquals(500, response2.getStatus());
- driver.close();
- }
+ assertEquals(429, response1.getStatus());
- @Test
- public void testOverLoadByAge() {
- RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
- // OVERLOAD is a 429
- access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
- var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
- try { Thread.sleep(3_000); } catch (InterruptedException e) {}
- var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" +
+ " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
"}", response2.readAll());
- assertEquals(429, response2.getStatus());
+ assertEquals(429, response1.getStatus());
access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
- handler.dispatchEnqueued();
+ var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
- "}", response1.readAll());
- assertEquals(500, response1.getStatus());
+ "}", response3.readAll());
+ assertEquals(500, response3.getStatus());
driver.close();
}
@@ -1036,78 +1006,6 @@ public class DocumentV1ApiTest {
});
}
- @Test
- public void testThroughput() throws InterruptedException {
- DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
- handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
- executorConfig, clusterConfig, bucketConfig);
-
- int writers = 4;
- int queueFill = executorConfig.maxThrottled() - writers;
- RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
- ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers);
- ScheduledExecutorService reader = Executors.newScheduledThreadPool(1);
- ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers);
- BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>();
-
- Response success = new Response(0, null, Response.Outcome.SUCCESS);
- int docs = 1 << 14;
- assertTrue(docs >= writers);
- AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>();
-
- CountDownLatch latch = new CountDownLatch(docs);
- reader.execute(() -> {
- while ( ! reader.isShutdown()) {
- try {
- var response = responses.take();
- response.awaitResponse().readAll();
- if (response.getStatus() != 200)
- failed.set(response.getResponse());
- latch.countDown();
- }
- catch (InterruptedException e) { break; }
- }
- });
-
- // Fill the handler resend queue.
- long startNanos = System.nanoTime();
- CountDownLatch setup = new CountDownLatch(queueFill);
- access.session.expect((id, parameters) -> {
- setup.countDown();
- return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR));
- });
- for (int i = 0; i < queueFill; i++) {
- int j = i;
- writer.execute(() -> {
- responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
- POST,
- "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
- });
- }
- setup.await();
-
- // Let "messagebus" start accepting messages.
- access.session.expect((id, parameters) -> {
- replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
- return new Result(0);
- });
- // Send the rest of the documents. Rely on resender to empty queue of throttled operations.
- for (int i = queueFill; i < docs; i++) {
- int j = i;
- writer.execute(() -> {
- responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
- POST,
- "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
- });
- }
- latch.await();
- System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
-
- assertNull(failed.get());
- driver.close();
- }
-
-
static class MockDocumentAccess extends DocumentAccess {
private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
@@ -1223,7 +1121,7 @@ public class DocumentV1ApiTest {
@Override
public double getCurrentWindowSize() {
- throw new AssertionError("Not used");
+ return 20;
}
public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) {