summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2024-04-04 11:40:17 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2024-04-04 11:40:17 +0200
commitee12015e371abc4a5e43b4fa4080e94f4c8a600f (patch)
tree15961bc5688c87d63f0ec6186df0d44e0daa8447 /vespaclient-container-plugin
parent800b4beb92627d2faad8681ae9a1f04347731c28 (diff)
Limit amount of work in DocumentV1 Q to 4096, or 3s.
Having only a fixed length here does no work well when throughput is low.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java26
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def2
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java77
3 files changed, 79 insertions, 26 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index f8dc1844c15..b483d6977d6 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -188,6 +188,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final DocumentApiMetrics metrics;
private final DocumentOperationParser parser;
private final long maxThrottled;
+ private final long maxThrottledAgeNS;
private final DocumentAccess access;
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
@@ -221,6 +222,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
this.maxThrottled = executorConfig.maxThrottled();
+ this.maxThrottledAgeNS = (long) (executorConfig.maxThrottledAge() * 1_000_000_000.0);
this.access = access;
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
@@ -470,7 +472,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
enqueueAndDispatch(request, handler, () -> {
ParsedDocumentOperation parsed = parser.parsePut(in, path.id().toString());
DocumentPut put = (DocumentPut)parsed.operation();
- getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(c -> put.setCondition(c));
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
getProperty(request, CREATE, booleanParser).ifPresent(put::setCreateIfNonExistent);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
.withResponseHandler(response -> {
@@ -596,16 +598,34 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
+ private long qAgeNS(HttpRequest request) {
+ Operation oldest = operations.peek();
+ return (oldest != null)
+ ? (request.relativeCreatedAtNanoTime() - oldest.request.relativeCreatedAtNanoTime())
+ : 0;
+ }
+
/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) {
- if (enqueued.incrementAndGet() > maxThrottled) {
+ long numQueued = enqueued.incrementAndGet();
+ if (numQueued > maxThrottled) {
enqueued.decrementAndGet();
- overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler);
+ overload(request, "Rejecting execution due to overload: "
+ + maxThrottled + " requests already enqueued", handler);
return;
}
+ if (numQueued > 1) {
+ long ageNS = qAgeNS(request);
+ if (ageNS > maxThrottledAgeNS) {
+ enqueued.decrementAndGet();
+ overload(request, "Rejecting execution due to overload: "
+ + maxThrottledAgeNS / 1_000_000_000.0 + " seconds worth of work enqueued", handler);
+ return;
+ }
+ }
operations.offer(new Operation(request, handler, operationParser));
dispatchFirst();
}
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
index d44f664b448..60d66ed36d4 100644
--- a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
+++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
@@ -7,3 +7,5 @@ resendDelayMillis int default=10
# Bound on number of document operations to keep in retry queue — further operations are rejected
maxThrottled int default=4096
+# Max age in seconds of message in throttled Q.
+maxThrottledAge double default=3.0
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 04639db4dac..58cf34712aa 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -113,6 +113,7 @@ public class DocumentV1ApiTest {
.build();
final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder()
.maxThrottled(2)
+ .maxThrottledAge(1.0)
.resendDelayMillis(1 << 30)
.build();
final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd")
@@ -189,6 +190,59 @@ public class DocumentV1ApiTest {
}
@Test
+ public void testOverLoadBySize() {
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ // OVERLOAD is a 429
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
+ var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
+ "}", response3.readAll());
+ assertEquals(429, response3.getStatus());
+
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
+ handler.dispatchEnqueued();
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
+ "}", response1.readAll());
+ assertEquals(500, response1.getStatus());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
+ "}", response2.readAll());
+ assertEquals(500, response2.getStatus());
+ driver.close();
+ }
+
+ @Test
+ public void testOverLoadByAge() {
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ // OVERLOAD is a 429
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
+ var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ try { Thread.sleep(3_000); } catch (InterruptedException e) {}
+ var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" +
+ "}", response2.readAll());
+ assertEquals(429, response2.getStatus());
+
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
+ handler.dispatchEnqueued();
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
+ "}", response1.readAll());
+ assertEquals(500, response1.getStatus());
+ driver.close();
+ }
+
+ @Test
public void testResponses() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null), new AckToken(null));
@@ -923,29 +977,6 @@ public class DocumentV1ApiTest {
"}", response.readAll());
assertEquals(405, response.getStatus());
- // OVERLOAD is a 429
- access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
- var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
- var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
- var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
- "}", response3.readAll());
- assertEquals(429, response3.getStatus());
- access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
- handler.dispatchEnqueued();
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
- "}", response1.readAll());
- assertEquals(500, response1.getStatus());
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
- "}", response2.readAll());
- assertEquals(500, response2.getStatus());
-
// Request response does not arrive before timeout has passed.
AtomicReference<ResponseHandler> handler = new AtomicReference<>();
access.session.expect((id, parameters) -> {