summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java')
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java406
1 files changed, 406 insertions, 0 deletions
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
new file mode 100644
index 00000000000..1d2f6af35dd
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
@@ -0,0 +1,406 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.application.container.DocumentAccesses;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.local.LocalAsyncSession;
+import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test uses a config definition for the "music" document type, which has a single string field "artist".
+ * One cluster named "content" exists, and can be reached through the "route" route for "music" documents.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationExecutorTest {
+
+ final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder()
+ .cluster("content",
+ new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music",
+ new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace(FixedBucketSpaces.defaultSpace())))
+ .build();
+ final ClusterListConfig clusterConfig = new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder().configid("config-id")
+ .name("content"))
+ .build();
+ final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder()
+ .resendDelayMillis(10)
+ .defaultTimeoutSeconds(1)
+ .maxThrottled(2)
+ .build();
+ final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content",
+ "config-id",
+ Map.of("music", "route")));
+ final List<Document> received = new ArrayList<>();
+ final List<ErrorType> errors = new ArrayList<>();
+ final List<String> messages = new ArrayList<>();
+ final List<String> tokens = new ArrayList<>();
+ ManualClock clock;
+ LocalDocumentAccess access;
+ DocumentOperationExecutorImpl executor;
+ DocumentType musicType;
+ Document doc1;
+ Document doc2;
+ Document doc3;
+
+ OperationContext operationContext() {
+ return new OperationContext((type, error) -> { errors.add(type); messages.add(error); },
+ document -> document.ifPresent(received::add));
+ }
+
+ VisitOperationsContext visitContext() {
+ return new VisitOperationsContext((type, error) -> { errors.add(type); messages.add(error); },
+ token -> token.ifPresent(tokens::add),
+ received::add);
+ }
+
+ LocalAsyncSession session() {
+ return (LocalAsyncSession) executor.asyncSession();
+ }
+
+ @Before
+ public void setUp() {
+ clock = new ManualClock();
+ access = DocumentAccesses.createFromSchemas("src/test/cfg");
+ executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock);
+ received.clear();
+ errors.clear();
+ tokens.clear();
+
+ musicType = access.getDocumentTypeManager().getDocumentType("music");
+ doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one");
+ doc2 = new Document(musicType, "id:ns:music:n=1:2"); doc2.setFieldValue("artist", "two");
+ doc3 = new Document(musicType, "id:ns:music:g=a:3");
+ }
+
+ @After
+ public void tearDown() {
+ access.shutdown();
+ }
+
+ @Test
+ public void testResolveCluster() {
+ assertEquals("[Storage:cluster=content;clusterconfigid=config-id]",
+ executor.routeToCluster("content"));
+ try {
+ executor.routeToCluster("blargh");
+ fail("Should not find this cluster");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage());
+ }
+ assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name());
+ try {
+ DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of());
+ fail("No clusters should fail");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage());
+ }
+ try {
+ Map<String, StorageCluster> twoClusters = new TreeMap<>();
+ twoClusters.put("one", new StorageCluster("one", "one-config", Map.of()));
+ twoClusters.put("two", new StorageCluster("two", "two-config", Map.of()));
+ DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), twoClusters);
+ fail("More than one cluster and no document type should fail");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testThrottling() throws InterruptedException {
+ executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
+ // Put documents 1 and 2 into backend.
+ executor.put(new DocumentPut(doc1), parameters(), operationContext());
+ executor.put(new DocumentPut(doc2), parameters(), operationContext());
+ assertEquals(List.of(doc1, doc2), received);
+
+ session().setResultType(Result.ResultType.TRANSIENT_ERROR);
+
+ // First two are put on retry queue.
+ executor.get(doc1.getId(), parameters(), operationContext());
+ executor.get(doc2.getId(), parameters(), operationContext());
+ assertEquals(List.of(), errors);
+
+ // Third operation is rejected.
+ executor.get(doc3.getId(), parameters(), operationContext());
+ assertEquals(List.of(OVERLOAD), errors);
+
+ // Maintainer does not yet run.
+ executor.notifyMaintainers();
+ // Third operation is rejected again.
+ executor.get(doc3.getId(), parameters(), operationContext());
+ assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
+
+ // Maintainer retries documents, but they're put back into the queue with a new delay.
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers();
+ assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
+
+ session().setResultType(Result.ResultType.SUCCESS);
+ // Maintainer retries documents again, this time successfully.
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers();
+ assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
+ assertEquals(List.of(doc1, doc2, doc1, doc2), received);
+ }
+
+ @Test
+ public void testTimeout() throws InterruptedException {
+ Phaser phaser = new Phaser(1);
+ access.setPhaser(phaser);
+ executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
+
+ // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms
+ executor.put(new DocumentPut(doc1), parameters(), operationContext());
+ clock.advance(Duration.ofMillis(20));
+ executor.put(new DocumentPut(doc2), parameters(), operationContext());
+ executor.notifyMaintainers();
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(), received);
+
+ clock.advance(Duration.ofMillis(990));
+ executor.notifyMaintainers(); // Let doc1 time out.
+ phaser.arriveAndAwaitAdvance(); // Let doc2 arrive.
+ phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered.
+ assertEquals(List.of(TIMEOUT), errors);
+ assertEquals(List.of(doc2), received);
+
+ session().setResultType(Result.ResultType.TRANSIENT_ERROR);
+ executor.put(new DocumentPut(doc3), parameters(), operationContext());
+ clock.advance(Duration.ofMillis(990));
+ executor.notifyMaintainers(); // Retry throttled operation.
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers(); // Time out throttled operation.
+ assertEquals(List.of(TIMEOUT, TIMEOUT), errors);
+ assertEquals(List.of(doc2), received);
+
+ session().setResultType(Result.ResultType.SUCCESS);
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers(); // Retry not attempted since operation already timed out.
+ phaser.arriveAndAwaitAdvance();
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(List.of(TIMEOUT, TIMEOUT), errors);
+ assertEquals(List.of(doc2), received);
+ }
+
+ @Test
+ public void testCallback() {
+ AtomicBoolean called = new AtomicBoolean();
+ executor.get(doc1.getId(), parameters().withResponseHandler(__ -> called.set(true)), operationContext());
+ assertTrue(called.get());
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(), received);
+ }
+
+ @Test
+ public void testVisit() throws InterruptedException {
+ executor.put(new DocumentPut(doc1), parameters(), operationContext());
+ executor.put(new DocumentPut(doc2), parameters(), operationContext());
+ executor.put(new DocumentPut(doc3), parameters(), operationContext());
+ assertEquals(doc1, received.remove(0));
+ assertEquals(doc2, received.remove(0));
+ assertEquals(doc3, received.remove(0));
+
+ // No cluster or document type set.
+ executor.visit(VisitorOptions.builder()
+ .build(),
+ visitContext());
+ assertEquals("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level", messages.remove(0));
+ assertEquals(BAD_REQUEST, errors.remove(0));
+ assertEquals(List.of(), received);
+
+ // Cluster not found.
+ executor.visit(VisitorOptions.builder()
+ .cluster("blargh")
+ .build(),
+ visitContext());
+ assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", messages.remove(0));
+ assertEquals(BAD_REQUEST, errors.remove(0));
+ assertEquals(List.of(), received);
+
+ // Matches doc2 for user 1.
+ executor.visit(VisitorOptions.builder()
+ .cluster("content")
+ .group(Group.of(1))
+ .build(),
+ visitContext());
+ for (VisitorControlHandler session : executor.visitorSessions()) {
+ session.waitUntilDone();
+ }
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(doc2, received.remove(0));
+
+ // Matches documents in namespace ns of type music in group a.
+ executor.visit(VisitorOptions.builder()
+ .concurrency(2)
+ .wantedDocumentCount(3)
+ .namespace("ns")
+ .documentType("music")
+ .fieldSet("music:artist")
+ .group(Group.of("a"))
+ .build(),
+ visitContext());
+ for (VisitorControlHandler session : executor.visitorSessions())
+ session.waitUntilDone();
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(doc3, received.remove(0));
+
+ // Matches documents with non-empty artist field.
+ executor.visit(VisitorOptions.builder()
+ .cluster("content")
+ .selection("music.artist")
+ .fieldSet("[id]")
+ .build(),
+ visitContext());
+ for (VisitorControlHandler session : executor.visitorSessions())
+ session.waitUntilDone();
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(doc1.getId(), doc2.getId()), List.of(received.remove(0).getId(), received.remove(0).getId()));
+
+ // Matches all documents, but we'll shut down midway.
+ Phaser phaser = new Phaser(1);
+ access.setPhaser(phaser);
+ executor.visit(VisitorOptions.builder()
+ .cluster("content")
+ .bucketSpace("global")
+ .build(),
+ visitContext());
+ phaser.arriveAndAwaitAdvance(); // First document pending
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread shutdownThread = new Thread(() -> {
+ executor.shutdown();
+ latch.countDown();
+ });
+ shutdownThread.start();
+ clock.advance(Duration.ofMillis(100));
+ executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly.
+ latch.await(); // Make sure visit session is shut down before next document is considered.
+ phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above.
+ for (VisitorControlHandler session : executor.visitorSessions()) {
+ session.waitUntilDone();
+ }
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(doc1), received);
+ }
+
+ @Test
+ public void testDelayQueue() throws ExecutionException, InterruptedException, TimeoutException {
+ Supplier<Result> nullOperation = () -> null;
+ AtomicLong counter1 = new AtomicLong(0);
+ AtomicLong counter2 = new AtomicLong(0);
+ AtomicLong counter3 = new AtomicLong(0);
+ AtomicBoolean throttle = new AtomicBoolean(true);
+ OperationContext context1 = new OperationContext((type, message) -> counter1.decrementAndGet(), doc -> counter1.incrementAndGet());
+ OperationContext context2 = new OperationContext((type, message) -> counter2.decrementAndGet(), doc -> counter2.incrementAndGet());
+ OperationContext context3 = new OperationContext((type, message) -> counter3.decrementAndGet(), doc -> counter3.incrementAndGet());
+ DelayQueue queue = new DelayQueue(3,
+ (operation, context) -> {
+ if (throttle.get())
+ return false;
+
+ context.success(Optional.empty());
+ return true;
+ },
+ Duration.ofMillis(30),
+ clock,
+ "test");
+ synchronized (queue) { queue.notify(); queue.wait(); } // Make sure maintainers have gone to wait before test starts.
+
+ // Add three operations:
+ //  the first shall be handled by the queue on second attempt,
+ // the second by an external call,and
+ // the third during shutdown — added later.
+ assertTrue(queue.add(nullOperation, context1));
+ clock.advance(Duration.ofMillis(20));
+ assertTrue(queue.add(nullOperation, context2));
+ assertTrue(queue.add(nullOperation, context3));
+ assertFalse("New entries should be rejected by a full queue", queue.add(nullOperation, context3));
+ assertEquals(3, queue.size());
+ assertEquals(0, counter1.get());
+ assertEquals(0, counter2.get());
+ assertEquals(0, counter3.get());
+
+ context2.error(ERROR, "error"); // Marks this as handled, ready to be evicted.
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer does not run yet, as it's not yet time.
+ assertEquals(0, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(3, queue.size());
+
+ clock.advance(Duration.ofMillis(15));
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, failing to handle first and evicting second entry.
+ assertEquals(0, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(2, queue.size());
+
+ throttle.set(false);
+ clock.advance(Duration.ofMillis(15));
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer runs again, successfully handling first entry.
+ assertEquals(1, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(1, queue.size());
+
+ queue.shutdown(Duration.ZERO, context -> context.error(ERROR, "shutdown"))
+ .get(1, TimeUnit.SECONDS);
+ assertEquals(1, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(-1, counter3.get());
+ assertEquals(0, queue.size());
+ }
+
+}