summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-09 14:05:28 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-09 14:07:43 +0200
commit8f3f58da7157d9fca170f9370c9fa936100501c3 (patch)
treeab554e09366e53221d9e64fdc2850ead0790d5b6 /vespaclient-container-plugin
parent46c91ff61b279ef89cf41a9b28f921c42c9de464 (diff)
Add a concurrency unit test to /doc/v1 test
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java77
1 files changed, 75 insertions, 2 deletions
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index a0f6fd45dd8..49fa15849ce 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -66,11 +66,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Phaser;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -83,6 +85,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -675,6 +678,76 @@ public class DocumentV1ApiTest {
driver.close();
}
+ @Test
+ public void testThroughput() throws InterruptedException {
+ DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
+ handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+
+ int writers = 4;
+ int queueFill = executorConfig.maxThrottled() - writers;
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ ScheduledExecutorService writer = Executors.newScheduledThreadPool(writers);
+ ScheduledExecutorService reader = Executors.newScheduledThreadPool(1);
+ ScheduledExecutorService replier = Executors.newScheduledThreadPool(writers);
+ BlockingQueue<RequestHandlerTestDriver.MockResponseHandler> responses = new LinkedBlockingQueue<>();
+
+ Response success = new Response(0, null, Response.Outcome.SUCCESS);
+ int docs = 1 << 14;
+ assertTrue(docs >= writers);
+ AtomicReference<com.yahoo.jdisc.Response> failed = new AtomicReference<>();
+
+ CountDownLatch latch = new CountDownLatch(docs);
+ reader.execute(() -> {
+ while ( ! reader.isShutdown()) {
+ try {
+ var response = responses.take();
+ response.awaitResponse().readAll();
+ if (response.getStatus() != 200)
+ failed.set(response.getResponse());
+ latch.countDown();
+ }
+ catch (InterruptedException e) { break; }
+ }
+ });
+
+ // Fill the handler resend queue.
+ long startNanos = System.nanoTime();
+ CountDownLatch setup = new CountDownLatch(queueFill);
+ access.session.expect((id, parameters) -> {
+ setup.countDown();
+ return new Result(Result.ResultType.TRANSIENT_ERROR, new Error());
+ });
+ for (int i = 0; i < queueFill; i++) {
+ int j = i;
+ writer.execute(() -> {
+ responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
+ POST,
+ "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
+ });
+ }
+ setup.await();
+
+ // Let "messagebus" start accepting messages.
+ access.session.expect((id, parameters) -> {
+ replier.schedule(() -> parameters.responseHandler().get().handleResponse(success), 10, TimeUnit.MILLISECONDS);
+ return new Result(0);
+ });
+ // Send the rest of the documents. Rely on resender to empty queue of throttled oppperations.
+ for (int i = queueFill; i < docs; i++) {
+ int j = i;
+ writer.execute(() -> {
+ responses.add(driver.sendRequest("http://localhost/document/v1/ns/music/docid/" + j,
+ POST,
+ "{ \"fields\": { \"artist\": \"Sigrid\" } }"));
+ });
+ }
+ latch.await();
+ System.err.println((System.nanoTime() - startNanos) * 1e-9 + " seconds total");
+
+ assertNull(failed.get());
+ driver.close();
+ }
+
static class MockDocumentAccess extends DocumentAccess {