diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2024-04-04 11:40:17 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2024-04-04 11:40:17 +0200 |
commit | ee12015e371abc4a5e43b4fa4080e94f4c8a600f (patch) | |
tree | 15961bc5688c87d63f0ec6186df0d44e0daa8447 /vespaclient-container-plugin | |
parent | 800b4beb92627d2faad8681ae9a1f04347731c28 (diff) |
Limit amount of work in DocumentV1 Q to 4096, or 3s.
Having only a fixed length here does no work well when throughput is low.
Diffstat (limited to 'vespaclient-container-plugin')
3 files changed, 79 insertions, 26 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index f8dc1844c15..b483d6977d6 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -188,6 +188,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final DocumentApiMetrics metrics; private final DocumentOperationParser parser; private final long maxThrottled; + private final long maxThrottledAgeNS; private final DocumentAccess access; private final AsyncSession asyncSession; private final Map<String, StorageCluster> clusters; @@ -221,6 +222,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.metric = metric; this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); this.maxThrottled = executorConfig.maxThrottled(); + this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0); this.access = access; this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); @@ -470,7 +472,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { enqueueAndDispatch(request, handler, () -> { ParsedDocumentOperation parsed = parser.parsePut(in, path.id().toString()); DocumentPut put = (DocumentPut)parsed.operation(); - getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(c -> put.setCondition(c)); + getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); getProperty(request, CREATE, booleanParser).ifPresent(put::setCreateIfNonExistent); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) .withResponseHandler(response -> { @@ -596,16 +598,34 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } + private long qAgeNS(HttpRequest request) { + Operation oldest = operations.peek(); + return (oldest != null) + ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime()) + : 0; + } + /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. */ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) { - if (enqueued.incrementAndGet() > maxThrottled) { + long numQueued = enqueued.incrementAndGet(); + if (numQueued > maxThrottled) { enqueued.decrementAndGet(); - overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler); + overload(request, "Rejecting execution due to overload: " + + maxThrottled + " requests already enqueued", handler); return; } + if (numQueued > 1) { + long ageNS = qAgeNS(request); + if (ageNS > maxThrottledAgeNS) { + enqueued.decrementAndGet(); + overload(request, "Rejecting execution due to overload: " + + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler); + return; + } + } operations.offer(new Operation(request, handler, operationParser)); dispatchFirst(); } diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def index d44f664b448..60d66ed36d4 100644 --- a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def +++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def @@ -7,3 +7,5 @@ resendDelayMillis int default=10 # Bound on number of document operations to keep in retry queue — further operations are rejected maxThrottled int default=4096 +# Max age in seconds of message in throttled Q. +maxThrottledAge double default=3.0 diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 04639db4dac..58cf34712aa 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -113,6 +113,7 @@ public class DocumentV1ApiTest { .build(); final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder() .maxThrottled(2) + .maxThrottledAge(1.0) .resendDelayMillis(1 << 30) .build(); final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd") @@ -189,6 +190,59 @@ public class DocumentV1ApiTest { } @Test + public void testOverLoadBySize() { + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + // OVERLOAD is a 429 + access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); + var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" + + "}", response3.readAll()); + assertEquals(429, response3.getStatus()); + + access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); + handler.dispatchEnqueued(); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + + "}", response1.readAll()); + assertEquals(500, response1.getStatus()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + + "}", response2.readAll()); + assertEquals(500, response2.getStatus()); + driver.close(); + } + + @Test + public void testOverLoadByAge() { + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + // OVERLOAD is a 429 + access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); + var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + try { Thread.sleep(3_000); } catch (InterruptedException e) {} + var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" + + "}", response2.readAll()); + assertEquals(429, response2.getStatus()); + + access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); + handler.dispatchEnqueued(); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + + "}", response1.readAll()); + assertEquals(500, response1.getStatus()); + driver.close(); + } + + @Test public void testResponses() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null), new AckToken(null)); @@ -923,29 +977,6 @@ public class DocumentV1ApiTest { "}", response.readAll()); assertEquals(405, response.getStatus()); - // OVERLOAD is a 429 - access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); - var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); - var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); - var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" + - "}", response3.readAll()); - assertEquals(429, response3.getStatus()); - access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); - handler.dispatchEnqueued(); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + - "}", response1.readAll()); - assertEquals(500, response1.getStatus()); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + - "}", response2.readAll()); - assertEquals(500, response2.getStatus()); - // Request response does not arrive before timeout has passed. AtomicReference<ResponseHandler> handler = new AtomicReference<>(); access.session.expect((id, parameters) -> { |