diff options
Diffstat (limited to 'vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java')
-rw-r--r-- | vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java | 120 |
1 files changed, 111 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 2d0b2de100e..58cf34712aa 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -71,7 +71,13 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -184,31 +190,55 @@ public class DocumentV1ApiTest { } @Test - public void testOverLoad() { + public void testOverLoadBySize() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); // OVERLOAD is a 429 access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" + + "}", response3.readAll()); + assertEquals(429, response3.getStatus()); + + access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); + handler.dispatchEnqueued(); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + "}", response1.readAll()); - assertEquals(429, response1.getStatus()); + assertEquals(500, response1.getStatus()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + + "}", response2.readAll()); + assertEquals(500, response2.getStatus()); + driver.close(); + } + @Test + public void testOverLoadByAge() { + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + // OVERLOAD is a 429 + access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR))); + var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + try { Thread.sleep(3_000); } catch (InterruptedException e) {} + var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"Rejecting execution due to overload: 20 requests already enqueued\"" + + " \"message\": \"Rejecting execution due to overload: 1.0 seconds worth of work enqueued\"" + "}", response2.readAll()); - assertEquals(429, response1.getStatus()); + assertEquals(429, response2.getStatus()); access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, Result.toError(Result.ResultType.FATAL_ERROR))); - var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two", POST, "{\"fields\": {}}"); + handler.dispatchEnqueued(); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"message\": \"[FATAL_ERROR @ localhost]: FATAL_ERROR\"" + - "}", response3.readAll()); - assertEquals(500, response3.getStatus()); + "}", response1.readAll()); + assertEquals(500, response1.getStatus()); driver.close(); } @@ -1006,6 +1036,78 @@ public class DocumentV1ApiTest { }); } + @Test + public void testThroughput() throws InterruptedException { + DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, + executorConfig, clusterConfig, bucketConfig); + + int writers = 4; + int queueFill = executorConfig.maxThrottled() - writers; + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers); + ScheduledExecutorService reader = Executors.newScheduledThreadPool(1); + ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers); + BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>(); + + Response success = new Response(0, null, Response.Outcome.SUCCESS); + int docs = 1 << 14; + assertTrue(docs >= writers); + AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>(); + + CountDownLatch latch = new CountDownLatch(docs); + reader.execute(() -> { + while ( ! reader.isShutdown()) { + try { + var response = responses.take(); + response.awaitResponse().readAll(); + if (response.getStatus() != 200) + failed.set(response.getResponse()); + latch.countDown(); + } + catch (InterruptedException e) { break; } + } + }); + + // Fill the handler resend queue. + long startNanos = System.nanoTime(); + CountDownLatch setup = new CountDownLatch(queueFill); + access.session.expect((id, parameters) -> { + setup.countDown(); + return new Result(Result.ResultType.TRANSIENT_ERROR, Result.toError(Result.ResultType.TRANSIENT_ERROR)); + }); + for (int i = 0; i < queueFill; i++) { + int j = i; + writer.execute(() -> { + responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j, + POST, + "{ \"fields\": { \"artist\": \"Sigrid\" } }")); + }); + } + setup.await(); + + // Let "messagebus" start accepting messages. + access.session.expect((id, parameters) -> { + replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS); + return new Result(0); + }); + // Send the rest of the documents. Rely on resender to empty queue of throttled operations. + for (int i = queueFill; i < docs; i++) { + int j = i; + writer.execute(() -> { + responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j, + POST, + "{ \"fields\": { \"artist\": \"Sigrid\" } }")); + }); + } + latch.await(); + System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + + assertNull(failed.get()); + driver.close(); + } + + static class MockDocumentAccess extends DocumentAccess { private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>(); @@ -1121,7 +1223,7 @@ public class DocumentV1ApiTest { @Override public double getCurrentWindowSize() { - return 20; + throw new AssertionError("Not used"); } public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) { |