aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java')
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java120
1 files changed, 111 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 2d0b2de100e..58cf34712aa 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -71,7 +71,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -184,31 +190,55 @@ public class DocumentV1ApiTest {
}
@Test
- public void testOverLoad() {
+ public void testOverLoadBySize() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
// OVERLOAD is a 429
access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
+ "}", response3.readAll());
+ assertEquals(429, response3.getStatus());
+
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
+ handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
"}", response1.readAll());
- assertEquals(429, response1.getStatus());
+ assertEquals(500, response1.getStatus());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
+ "}", response2.readAll());
+ assertEquals(500, response2.getStatus());
+ driver.close();
+ }
+ @Test
+ public void testOverLoadByAge() {
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ // OVERLOAD is a 429
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)));
+ var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ try { Thread.sleep(3_000); } catch (InterruptedException e) {}
+ var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" +
+ " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" +
"}", response2.readAll());
- assertEquals(429, response1.getStatus());
+ assertEquals(429, response2.getStatus());
access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR)));
- var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}");
+ handler.dispatchEnqueued();
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" +
- "}", response3.readAll());
- assertEquals(500, response3.getStatus());
+ "}", response1.readAll());
+ assertEquals(500, response1.getStatus());
driver.close();
}
@@ -1006,6 +1036,78 @@ public class DocumentV1ApiTest {
});
}
+ @Test
+ public void testThroughput() throws InterruptedException {
+ DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig,
+ executorConfig, clusterConfig, bucketConfig);
+
+ int writers = 4;
+ int queueFill = executorConfig.maxThrottled() - writers;
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers);
+ ScheduledExecutorService reader = Executors.newScheduledThreadPool(1);
+ ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers);
+ BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>();
+
+ Response success = new Response(0, null, Response.Outcome.SUCCESS);
+ int docs = 1 << 14;
+ assertTrue(docs >= writers);
+ AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>();
+
+ CountDownLatch latch = new CountDownLatch(docs);
+ reader.execute(() -> {
+ while ( ! reader.isShutdown()) {
+ try {
+ var response = responses.take();
+ response.awaitResponse().readAll();
+ if (response.getStatus() != 200)
+ failed.set(response.getResponse());
+ latch.countDown();
+ }
+ catch (InterruptedException e) { break; }
+ }
+ });
+
+ // Fill the handler resend queue.
+ long startNanos = System.nanoTime();
+ CountDownLatch setup = new CountDownLatch(queueFill);
+ access.session.expect((id, parameters) -> {
+ setup.countDown();
+ return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR));
+ });
+ for (int i = 0; i < queueFill; i++) {
+ int j = i;
+ writer.execute(() -> {
+ responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
+ POST,
+ "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
+ });
+ }
+ setup.await();
+
+ // Let "messagebus" start accepting messages.
+ access.session.expect((id, parameters) -> {
+ replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
+ return new Result(0);
+ });
+ // Send the rest of the documents. Rely on resender to empty queue of throttled operations.
+ for (int i = queueFill; i < docs; i++) {
+ int j = i;
+ writer.execute(() -> {
+ responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
+ POST,
+ "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
+ });
+ }
+ latch.await();
+ System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+
+ assertNull(failed.get());
+ driver.close();
+ }
+
+
static class MockDocumentAccess extends DocumentAccess {
private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
@@ -1121,7 +1223,7 @@ public class DocumentV1ApiTest {
@Override
public double getCurrentWindowSize() {
- return 20;
+ throw new AssertionError("Not used");
}
public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) {