aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2024-04-16 21:09:52 +0200
committerGitHub <noreply@github.com>2024-04-16 21:09:52 +0200
commit302b8f03d28baef770719f8b73315d78fc6da950 (patch)
tree87b106480a74747ea0faae89470005ca6b733b5c
parent643fe99939f30f62b3c16a6531ba79bab9025b30 (diff)
parent3be23c8101467995923c0a40dc6aa7e510d4b0a1 (diff)
Merge pull request #30929 from vespa-engine/revert-30815-balder/rely-on-mbus-dynamic-qv8.331.34
Revert "- Avoid a Q in the document v1 handler. Rely only on mbus Q."
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java73
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java120
3 files changed, 183 insertions, 16 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java b/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java
index e801873e2cd..6a25937592b 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/HttpRequest.java
@@ -88,7 +88,11 @@ public class HttpRequest extends Request {
this.version = version;
this.remoteAddress = remoteAddress;
this.parameters.putAll(getUriQueryParameters(uri));
- this.connectedAt = (connectedAtMillis != null) ? connectedAtMillis : creationTime(TimeUnit.MILLISECONDS);
+ if (connectedAtMillis != null) {
+ this.connectedAt = connectedAtMillis;
+ } else {
+ this.connectedAt = creationTime(TimeUnit.MILLISECONDS);
+ }
} catch (Throwable e) {
release();
throw e;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 594c5c8f398..b483d6977d6 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -187,12 +187,17 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Metric metric;
private final DocumentApiMetrics metrics;
private final DocumentOperationParser parser;
+ private final long maxThrottled;
+ private final long maxThrottledAgeNS;
private final DocumentAccess access;
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
+ private final Deque<Operation> operations;
private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>();
+ private final AtomicLong enqueued = new AtomicLong();
private final AtomicLong outstanding = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@@ -216,12 +221,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.parser = new DocumentOperationParser(documentmanagerConfig);
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.maxThrottled = executorConfig.maxThrottled();
+ this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0);
this.access = access;
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
+ this.operations = new ConcurrentLinkedDeque<>();
long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis();
// TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty.
+ this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS);
}
@@ -279,19 +288,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
visits.values().forEach(VisitorSession::abort);
visits.values().forEach(VisitorSession::destroy);
- // Shut down visitor dispatcher, so only we empty the queue of outstanding operations, and can be sure it is empty.
+ // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
+ dispatcher.shutdown();
visitDispatcher.shutdown();
- while ( ! (visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
+ while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
+ dispatchEnqueued();
dispatchVisitEnqueued();
}
+ if ( ! operations.isEmpty())
+ log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left");
+
if ( ! visitOperations.isEmpty())
- log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + visitOperations.size() + " operations left");
+ log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left");
try {
while (outstanding.get() > 0 && clock.instant().isBefore(doom))
Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
+ if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
+ dispatcher.shutdownNow();
+
if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
visitDispatcher.shutdownNow();
}
@@ -535,6 +552,30 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
/** Dispatches enqueued requests until one is blocked. */
+ void dispatchEnqueued() {
+ try {
+ while (dispatchFirst());
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
+ }
+ }
+
+ /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */
+ private boolean dispatchFirst() {
+ Operation operation = operations.poll();
+ if (operation == null)
+ return false;
+
+ if (operation.dispatch()) {
+ enqueued.decrementAndGet();
+ return true;
+ }
+ operations.push(operation);
+ return false;
+ }
+
+ /** Dispatches enqueued requests until one is blocked. */
void dispatchVisitEnqueued() {
try {
while (dispatchFirstVisit());
@@ -557,16 +598,36 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
+ private long qAgeNS(HttpRequest request) {
+ Operation oldest = operations.peek();
+ return (oldest != null)
+ ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime())
+ : 0;
+ }
+
/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) {
- Operation operation = new Operation(request, handler, operationParser);
- if ( ! operation.dispatch()) {
+ long numQueued = enqueued.incrementAndGet();
+ if (numQueued > maxThrottled) {
+ enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: "
- + (long)asyncSession.getCurrentWindowSize() + " requests already enqueued", handler);
+ + maxThrottled + " requests already enqueued", handler);
+ return;
+ }
+ if (numQueued > 1) {
+ long ageNS = qAgeNS(request);
+ if (ageNS > maxThrottledAgeNS) {
+ enqueued.decrementAndGet();
+ overload(request, "Rejecting execution due to overload: "
+ + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler);
+ return;
+ }
}
+ operations.offer(new Operation(request, handler, operationParser));
+ dispatchFirst();
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 2d0b2de100e..58cf34712aa 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -71,7 +71,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -184,31 +190,55 @@ public class DocumentV1ApiTest {
}
@Test
- public void testOverLoad() {
+ public void testOverLoadBySize() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
// OVERLOAD is a 429
access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
+ "}", response3.readAll());
+ assertEquals(429, response3.getStatus());
+
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
+ handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
"}", response1.readAll());
- assertEquals(429, response1.getStatus());
+ assertEquals(500, response1.getStatus());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
+ "}", response2.readAll());
+ assertEquals(500, response2.getStatus());
+ driver.close();
+ }
+ @Test
+ public void testOverLoadByAge() {
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ // OVERLOAD is a 429
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
+ var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ try { Thread.sleep(3_000); } catch (InterruptedException e) {}
+ var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
+ " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" +
"}", response2.readAll());
- assertEquals(429, response1.getStatus());
+ assertEquals(429, response2.getStatus());
access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
- var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
- "}", response3.readAll());
- assertEquals(500, response3.getStatus());
+ "}", response1.readAll());
+ assertEquals(500, response1.getStatus());
driver.close();
}
@@ -1006,6 +1036,78 @@ public class DocumentV1ApiTest {
});
}
+ @Test
+ public void testThroughput() throws InterruptedException {
+ DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
+ executorConfig, clusterConfig, bucketConfig);
+
+ int writers = 4;
+ int queueFill = executorConfig.maxThrottled() - writers;
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers);
+ ScheduledExecutorService reader = Executors.newScheduledThreadPool(1);
+ ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers);
+ BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>();
+
+ Response success = new Response(0, null, Response.Outcome.SUCCESS);
+ int docs = 1 << 14;
+ assertTrue(docs >= writers);
+ AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>();
+
+ CountDownLatch latch = new CountDownLatch(docs);
+ reader.execute(() -> {
+ while ( ! reader.isShutdown()) {
+ try {
+ var response = responses.take();
+ response.awaitResponse().readAll();
+ if (response.getStatus() != 200)
+ failed.set(response.getResponse());
+ latch.countDown();
+ }
+ catch (InterruptedException e) { break; }
+ }
+ });
+
+ // Fill the handler resend queue.
+ long startNanos = System.nanoTime();
+ CountDownLatch setup = new CountDownLatch(queueFill);
+ access.session.expect((id, parameters) -> {
+ setup.countDown();
+ return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR));
+ });
+ for (int i = 0; i < queueFill; i++) {
+ int j = i;
+ writer.execute(() -> {
+ responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
+ POST,
+ "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
+ });
+ }
+ setup.await();
+
+ // Let "messagebus" start accepting messages.
+ access.session.expect((id, parameters) -> {
+ replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
+ return new Result(0);
+ });
+ // Send the rest of the documents. Rely on resender to empty queue of throttled operations.
+ for (int i = queueFill; i < docs; i++) {
+ int j = i;
+ writer.execute(() -> {
+ responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
+ POST,
+ "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
+ });
+ }
+ latch.await();
+ System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+
+ assertNull(failed.get());
+ driver.close();
+ }
+
+
static class MockDocumentAccess extends DocumentAccess {
private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
@@ -1121,7 +1223,7 @@ public class DocumentV1ApiTest {
@Override
public double getCurrentWindowSize() {
- return 20;
+ throw new AssertionError("Not used");
}
public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) {