aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-06-11 11:30:36 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-06-11 13:45:11 +0200
commit42852ad470315f3bd3e8da2701f3a2de91d14b95 (patch)
tree73f897d3238ea77a0510a4dc068bb4b933fc7804 /documentapi
parent04612f061138bda3e5bff58313a2323dc4005b87 (diff)
Respect selection and field set parameters, and unit test LocalVisitorSession
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java5
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java29
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java241
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java103
4 files changed, 270 insertions, 108 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index 202929130c7..c69a8fb48de 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -3,6 +3,7 @@ package com.yahoo.documentapi.local;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
+import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentAccess;
@@ -43,8 +44,8 @@ public class LocalDocumentAccess extends DocumentAccess {
}
@Override
- public VisitorSession createVisitorSession(VisitorParameters parameters) {
- throw new UnsupportedOperationException("Not supported yet");
+ public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ return new LocalVisitorSession(this, parameters);
}
@Override
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index 022260d275d..1894e32aeb9 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -1,8 +1,16 @@
package com.yahoo.documentapi.local;
import com.yahoo.document.Document;
+import com.yahoo.document.DocumentGet;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
+import com.yahoo.document.Field;
+import com.yahoo.document.fieldset.FieldCollection;
+import com.yahoo.document.fieldset.FieldSet;
+import com.yahoo.document.fieldset.FieldSetRepo;
+import com.yahoo.document.select.DocumentSelector;
+import com.yahoo.document.select.Result;
+import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
@@ -15,6 +23,7 @@ import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.messagebus.Trace;
import com.yahoo.yolean.Exceptions;
+import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,15 +42,20 @@ public class LocalVisitorSession implements VisitorSession {
private final VisitorDataHandler data;
private final VisitorControlHandler control;
private final Map<DocumentId, Document> outstanding;
+ private final DocumentSelector selector;
+ private final FieldSet fieldSet;
private final AtomicReference<State> state;
- public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) {
+ public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException {
if (parameters.getResumeToken() != null)
throw new UnsupportedOperationException("Continuation via progress tokens is not supported");
if (parameters.getRemoteDataHandler() != null)
throw new UnsupportedOperationException("Remote data handlers are not supported");
+ this.selector = new DocumentSelector(parameters.getDocumentSelection());
+ this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet());
+
this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler();
this.data.reset();
this.data.setSession(this);
@@ -50,7 +64,8 @@ public class LocalVisitorSession implements VisitorSession {
this.control.reset();
this.control.setSession(this);
- this.outstanding = new ConcurrentSkipListMap<>(access.documents);
+ this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString));
+ this.outstanding.putAll(access.documents);
this.state = new AtomicReference<>(State.RUNNING);
start();
@@ -61,7 +76,15 @@ public class LocalVisitorSession implements VisitorSession {
try {
// Iterate through all documents and pass on to data handler
outstanding.forEach((id, document) -> {
- data.onMessage(new PutDocumentMessage(new DocumentPut(document)),
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ return;
+
+ Document copy = new Document(document.getDataType(), document.getId());
+ for (Field field : document.getDataType().getFields())
+ if (fieldSet.contains(field))
+ copy.setFieldValue(field, document.getFieldValue(field));
+
+ data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
new AckToken(id));
});
// Transition to a terminal state when done
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
new file mode 100644
index 00000000000..6f9e5f23658
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -0,0 +1,241 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.local;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.document.update.FieldUpdate;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentAccessParams;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.SyncParameters;
+import com.yahoo.documentapi.SyncSession;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.test.AbstractDocumentApiTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Runs the superclass tests on this implementation
+ *
+ * @author bratseth
+ */
+public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
+
+ protected LocalDocumentAccess access;
+
+ @Override
+ protected DocumentAccess access() {
+ return access;
+ }
+
+ @Before
+ public void setUp() {
+ DocumentAccessParams params = new DocumentAccessParams();
+ params.setDocumentManagerConfigId("file:src/test/cfg/documentmanager.cfg");
+ access = new LocalDocumentAccess(params);
+ }
+
+ @After
+ public void shutdownAccess() {
+ access.shutdown();
+ }
+
+ @Test
+ public void testNoExceptionFromAsync() {
+ AsyncSession session = access.createAsyncSession(new AsyncParameters());
+
+ DocumentType type = access.getDocumentTypeManager().getDocumentType("music");
+ DocumentUpdate docUp = new DocumentUpdate(type, new DocumentId("id:ns:music::2"));
+
+ Result result = session.update(docUp);
+ assertTrue(result.isSuccess());
+ Response response = session.getNext();
+ assertEquals(result.getRequestId(), response.getRequestId());
+ assertFalse(response.isSuccess());
+ session.destroy();
+ }
+
+ @Test
+ public void testAsyncFetch() {
+ AsyncSession session = access.createAsyncSession(new AsyncParameters());
+ List<DocumentId> ids = new ArrayList<>();
+ ids.add(new DocumentId("id:music:music::1"));
+ ids.add(new DocumentId("id:music:music::2"));
+ ids.add(new DocumentId("id:music:music::3"));
+ for (DocumentId id : ids)
+ session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
+ int timeout = 100;
+
+ long startTime = System.currentTimeMillis();
+ Set<Long> outstandingRequests = new HashSet<>();
+ for (DocumentId id : ids) {
+ Result result = session.get(id);
+ if ( ! result.isSuccess())
+ throw new IllegalStateException("Failed requesting document " + id, result.getError().getCause());
+ outstandingRequests.add(result.getRequestId());
+ }
+
+ List<Document> documents = new ArrayList<>();
+ try {
+ while ( ! outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeout - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse)response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for documents", e);
+ }
+
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
+ }
+
+ @Test
+ public void testFeedingAndVisiting() throws InterruptedException, ParseException {
+ DocumentType musicType = access().getDocumentTypeManager().getDocumentType("music");
+ Document doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one");
+ Document doc2 = new Document(musicType, "id:ns:music::2"); doc2.setFieldValue("artist", "two");
+ Document doc3 = new Document(musicType, "id:ns:music::3");
+
+ // Select all music documents where the "artist" field is set
+ VisitorParameters parameters = new VisitorParameters("music.artist");
+ parameters.setFieldSet("music:artist");
+ VisitorControlHandler control = new VisitorControlHandler();
+ parameters.setControlHandler(control);
+ List<Document> received = new ArrayList<>();
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) {
+ received.add(doc);
+ }
+ @Override public void onRemove(DocumentId id) {
+ throw new IllegalStateException("Not supposed to get here");
+ }
+ });
+
+ // Visit when there are no documents completes immediately
+ access.createVisitorSession(parameters).waitUntilDone(0);
+ assertSame(VisitorControlHandler.CompletionCode.SUCCESS,
+ control.getResult().getCode());
+ assertEquals(List.of(),
+ received);
+
+ // Sync-put some documents
+ SyncSession out = access.createSyncSession(new SyncParameters.Builder().build());
+ out.put(new DocumentPut(doc1));
+ out.put(new DocumentPut(doc2));
+ out.put(new DocumentPut(doc3));
+ assertEquals(Map.of(doc1.getId(), doc1,
+ doc2.getId(), doc2,
+ doc3.getId(), doc3),
+ access.documents);
+
+ // Expect a subset of documents to be returned, based on the selection
+ access.createVisitorSession(parameters).waitUntilDone(0);
+ assertSame(VisitorControlHandler.CompletionCode.SUCCESS,
+ control.getResult().getCode());
+ assertEquals(List.of(doc1, doc2),
+ received);
+
+ // Remove doc2 and set artist for doc3, to see changes are reflected in subsequent visits
+ out.remove(new DocumentRemove(doc2.getId()));
+ out.update(new DocumentUpdate(musicType, doc3.getId()).addFieldUpdate(FieldUpdate.createAssign(musicType.getField("artist"),
+ new StringFieldValue("three"))));
+ assertEquals(Map.of(doc1.getId(), doc1,
+ doc3.getId(), doc3),
+ access.documents);
+ assertEquals("three",
+ ((StringFieldValue) doc3.getFieldValue("artist")).getString());
+
+ // Visit the documents again, retrieving none of the document fields
+ parameters.setFieldSet("[id]");
+ received.clear();
+ access.createVisitorSession(parameters).waitUntilDone(0);
+ assertSame(VisitorControlHandler.CompletionCode.SUCCESS,
+ control.getResult().getCode());
+ assertEquals(List.of(new Document(musicType, doc1.getId()), new Document(musicType, doc3.getId())),
+ received);
+
+ // Visit the documents again, throwing an exception in the data handler on doc3
+ received.clear();
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) {
+ if (doc3.getId().equals(doc.getId()))
+ throw new RuntimeException("SEGFAULT");
+ received.add(doc);
+ }
+ @Override public void onRemove(DocumentId id) {
+ throw new IllegalStateException("Not supposed to get here");
+ }
+ });
+ access.createVisitorSession(parameters).waitUntilDone(0);
+ assertSame(VisitorControlHandler.CompletionCode.FAILURE,
+ control.getResult().getCode());
+ assertEquals("SEGFAULT",
+ control.getResult().getMessage());
+ assertEquals(List.of(new Document(musicType, doc1.getId())),
+ received);
+
+ // Visit the documents again, aborting after the first document
+ received.clear();
+ CountDownLatch visitLatch = new CountDownLatch(1);
+ CountDownLatch abortLatch = new CountDownLatch(1);
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) {
+ received.add(doc);
+ abortLatch.countDown();
+ try {
+ visitLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ @Override public void onRemove(DocumentId id) { throw new IllegalStateException("Not supposed to get here"); }
+ });
+ VisitorSession visit = access.createVisitorSession(parameters);
+ abortLatch.await();
+ control.abort();
+ visitLatch.countDown();
+ visit.waitUntilDone(0);
+ assertSame(VisitorControlHandler.CompletionCode.ABORTED,
+ control.getResult().getCode());
+ assertEquals(List.of(new Document(musicType, doc1.getId())),
+ received);
+ }
+
+}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java
deleted file mode 100644
index 252bf739951..00000000000
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.local.test;
-
-import com.yahoo.document.*;
-import com.yahoo.documentapi.*;
-import com.yahoo.documentapi.local.LocalDocumentAccess;
-import com.yahoo.documentapi.test.AbstractDocumentApiTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Runs the superclass tests on this implementation
- *
- * @author bratseth
- */
-public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
-
- protected DocumentAccess access;
-
- @Override
- protected DocumentAccess access() {
- return access;
- }
-
- @Before
- public void setUp() {
- DocumentAccessParams params = new DocumentAccessParams();
- params.setDocumentManagerConfigId("file:src/test/cfg/documentmanager.cfg");
- access = new LocalDocumentAccess(params);
- }
-
- @After
- public void shutdownAccess() {
- access.shutdown();
- }
-
- @Test
- public void testNoExceptionFromAsync() {
- AsyncSession session = access.createAsyncSession(new AsyncParameters());
-
- DocumentType type = access.getDocumentTypeManager().getDocumentType("music");
- DocumentUpdate docUp = new DocumentUpdate(type, new DocumentId("id:ns:music::2"));
-
- Result result = session.update(docUp);
- assertTrue(result.isSuccess());
- Response response = session.getNext();
- assertEquals(result.getRequestId(), response.getRequestId());
- assertFalse(response.isSuccess());
- session.destroy();
- }
-
- @Test
- public void testAsyncFetch() {
- AsyncSession session = access.createAsyncSession(new AsyncParameters());
- List<DocumentId> ids = new ArrayList<>();
- ids.add(new DocumentId("id:music:music::1"));
- ids.add(new DocumentId("id:music:music::2"));
- ids.add(new DocumentId("id:music:music::3"));
- for (DocumentId id : ids)
- session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
- int timeout = 100;
-
- long startTime = System.currentTimeMillis();
- Set<Long> outstandingRequests = new HashSet<>();
- for (DocumentId id : ids) {
- Result result = session.get(id);
- if ( ! result.isSuccess())
- throw new IllegalStateException("Failed requesting document " + id, result.getError().getCause());
- outstandingRequests.add(result.getRequestId());
- }
-
- List<Document> documents = new ArrayList<>();
- try {
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeout - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse)response).getDocument());
- outstandingRequests.remove(response.getRequestId());
- }
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for documents", e);
- }
-
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
- }
-
-}