diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-06-11 11:30:36 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-06-11 13:45:11 +0200 |
commit | 42852ad470315f3bd3e8da2701f3a2de91d14b95 (patch) | |
tree | 73f897d3238ea77a0510a4dc068bb4b933fc7804 /documentapi | |
parent | 04612f061138bda3e5bff58313a2323dc4005b87 (diff) |
Respect selection and field set parameters, and unit test LocalVisitorSession
Diffstat (limited to 'documentapi')
4 files changed, 270 insertions, 108 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index 202929130c7..c69a8fb48de 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -3,6 +3,7 @@ package com.yahoo.documentapi.local; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; +import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentAccess; @@ -43,8 +44,8 @@ public class LocalDocumentAccess extends DocumentAccess { } @Override - public VisitorSession createVisitorSession(VisitorParameters parameters) { - throw new UnsupportedOperationException("Not supported yet"); + public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { + return new LocalVisitorSession(this, parameters); } @Override diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index 022260d275d..1894e32aeb9 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -1,8 +1,16 @@ package com.yahoo.documentapi.local; import com.yahoo.document.Document; +import com.yahoo.document.DocumentGet; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; +import com.yahoo.document.Field; +import com.yahoo.document.fieldset.FieldCollection; +import com.yahoo.document.fieldset.FieldSet; +import com.yahoo.document.fieldset.FieldSetRepo; +import com.yahoo.document.select.DocumentSelector; +import com.yahoo.document.select.Result; +import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.AckToken; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; @@ -15,6 +23,7 @@ import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.messagebus.Trace; import com.yahoo.yolean.Exceptions; +import java.util.Comparator; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; @@ -33,15 +42,20 @@ public class LocalVisitorSession implements VisitorSession { private final VisitorDataHandler data; private final VisitorControlHandler control; private final Map<DocumentId, Document> outstanding; + private final DocumentSelector selector; + private final FieldSet fieldSet; private final AtomicReference<State> state; - public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) { + public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { if (parameters.getResumeToken() != null) throw new UnsupportedOperationException("Continuation via progress tokens is not supported"); if (parameters.getRemoteDataHandler() != null) throw new UnsupportedOperationException("Remote data handlers are not supported"); + this.selector = new DocumentSelector(parameters.getDocumentSelection()); + this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet()); + this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); this.data.reset(); this.data.setSession(this); @@ -50,7 +64,8 @@ public class LocalVisitorSession implements VisitorSession { this.control.reset(); this.control.setSession(this); - this.outstanding = new ConcurrentSkipListMap<>(access.documents); + this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString)); + this.outstanding.putAll(access.documents); this.state = new AtomicReference<>(State.RUNNING); start(); @@ -61,7 +76,15 @@ public class LocalVisitorSession implements VisitorSession { try { // Iterate through all documents and pass on to data handler outstanding.forEach((id, document) -> { - data.onMessage(new PutDocumentMessage(new DocumentPut(document)), + if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + return; + + Document copy = new Document(document.getDataType(), document.getId()); + for (Field field : document.getDataType().getFields()) + if (fieldSet.contains(field)) + copy.setFieldValue(field, document.getFieldValue(field)); + + data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), new AckToken(id)); }); // Transition to a terminal state when done diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java new file mode 100644 index 00000000000..6f9e5f23658 --- /dev/null +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -0,0 +1,241 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.local; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.document.update.FieldUpdate; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.AsyncSession; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.DocumentAccessParams; +import com.yahoo.documentapi.DocumentResponse; +import com.yahoo.documentapi.DumpVisitorDataHandler; +import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.SyncParameters; +import com.yahoo.documentapi.SyncSession; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.test.AbstractDocumentApiTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Runs the superclass tests on this implementation + * + * @author bratseth + */ +public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { + + protected LocalDocumentAccess access; + + @Override + protected DocumentAccess access() { + return access; + } + + @Before + public void setUp() { + DocumentAccessParams params = new DocumentAccessParams(); + params.setDocumentManagerConfigId("file:src/test/cfg/documentmanager.cfg"); + access = new LocalDocumentAccess(params); + } + + @After + public void shutdownAccess() { + access.shutdown(); + } + + @Test + public void testNoExceptionFromAsync() { + AsyncSession session = access.createAsyncSession(new AsyncParameters()); + + DocumentType type = access.getDocumentTypeManager().getDocumentType("music"); + DocumentUpdate docUp = new DocumentUpdate(type, new DocumentId("id:ns:music::2")); + + Result result = session.update(docUp); + assertTrue(result.isSuccess()); + Response response = session.getNext(); + assertEquals(result.getRequestId(), response.getRequestId()); + assertFalse(response.isSuccess()); + session.destroy(); + } + + @Test + public void testAsyncFetch() { + AsyncSession session = access.createAsyncSession(new AsyncParameters()); + List<DocumentId> ids = new ArrayList<>(); + ids.add(new DocumentId("id:music:music::1")); + ids.add(new DocumentId("id:music:music::2")); + ids.add(new DocumentId("id:music:music::3")); + for (DocumentId id : ids) + session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); + int timeout = 100; + + long startTime = System.currentTimeMillis(); + Set<Long> outstandingRequests = new HashSet<>(); + for (DocumentId id : ids) { + Result result = session.get(id); + if ( ! result.isSuccess()) + throw new IllegalStateException("Failed requesting document " + id, result.getError().getCause()); + outstandingRequests.add(result.getRequestId()); + } + + List<Document> documents = new ArrayList<>(); + try { + while ( ! outstandingRequests.isEmpty()) { + int timeSinceStart = (int)(System.currentTimeMillis() - startTime); + Response response = session.getNext(timeout - timeSinceStart); + if (response == null) + throw new RuntimeException("Timed out waiting for documents"); // or return what you have + if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore + + if (response.isSuccess()) + documents.add(((DocumentResponse)response).getDocument()); + outstandingRequests.remove(response.getRequestId()); + } + } + catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for documents", e); + } + + assertEquals(3, documents.size()); + for (Document document : documents) + assertNotNull(document); + } + + @Test + public void testFeedingAndVisiting() throws InterruptedException, ParseException { + DocumentType musicType = access().getDocumentTypeManager().getDocumentType("music"); + Document doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one"); + Document doc2 = new Document(musicType, "id:ns:music::2"); doc2.setFieldValue("artist", "two"); + Document doc3 = new Document(musicType, "id:ns:music::3"); + + // Select all music documents where the "artist" field is set + VisitorParameters parameters = new VisitorParameters("music.artist"); + parameters.setFieldSet("music:artist"); + VisitorControlHandler control = new VisitorControlHandler(); + parameters.setControlHandler(control); + List<Document> received = new ArrayList<>(); + parameters.setLocalDataHandler(new DumpVisitorDataHandler() { + @Override public void onDocument(Document doc, long timeStamp) { + received.add(doc); + } + @Override public void onRemove(DocumentId id) { + throw new IllegalStateException("Not supposed to get here"); + } + }); + + // Visit when there are no documents completes immediately + access.createVisitorSession(parameters).waitUntilDone(0); + assertSame(VisitorControlHandler.CompletionCode.SUCCESS, + control.getResult().getCode()); + assertEquals(List.of(), + received); + + // Sync-put some documents + SyncSession out = access.createSyncSession(new SyncParameters.Builder().build()); + out.put(new DocumentPut(doc1)); + out.put(new DocumentPut(doc2)); + out.put(new DocumentPut(doc3)); + assertEquals(Map.of(doc1.getId(), doc1, + doc2.getId(), doc2, + doc3.getId(), doc3), + access.documents); + + // Expect a subset of documents to be returned, based on the selection + access.createVisitorSession(parameters).waitUntilDone(0); + assertSame(VisitorControlHandler.CompletionCode.SUCCESS, + control.getResult().getCode()); + assertEquals(List.of(doc1, doc2), + received); + + // Remove doc2 and set artist for doc3, to see changes are reflected in subsequent visits + out.remove(new DocumentRemove(doc2.getId())); + out.update(new DocumentUpdate(musicType, doc3.getId()).addFieldUpdate(FieldUpdate.createAssign(musicType.getField("artist"), + new StringFieldValue("three")))); + assertEquals(Map.of(doc1.getId(), doc1, + doc3.getId(), doc3), + access.documents); + assertEquals("three", + ((StringFieldValue) doc3.getFieldValue("artist")).getString()); + + // Visit the documents again, retrieving none of the document fields + parameters.setFieldSet("[id]"); + received.clear(); + access.createVisitorSession(parameters).waitUntilDone(0); + assertSame(VisitorControlHandler.CompletionCode.SUCCESS, + control.getResult().getCode()); + assertEquals(List.of(new Document(musicType, doc1.getId()), new Document(musicType, doc3.getId())), + received); + + // Visit the documents again, throwing an exception in the data handler on doc3 + received.clear(); + parameters.setLocalDataHandler(new DumpVisitorDataHandler() { + @Override public void onDocument(Document doc, long timeStamp) { + if (doc3.getId().equals(doc.getId())) + throw new RuntimeException("SEGFAULT"); + received.add(doc); + } + @Override public void onRemove(DocumentId id) { + throw new IllegalStateException("Not supposed to get here"); + } + }); + access.createVisitorSession(parameters).waitUntilDone(0); + assertSame(VisitorControlHandler.CompletionCode.FAILURE, + control.getResult().getCode()); + assertEquals("SEGFAULT", + control.getResult().getMessage()); + assertEquals(List.of(new Document(musicType, doc1.getId())), + received); + + // Visit the documents again, aborting after the first document + received.clear(); + CountDownLatch visitLatch = new CountDownLatch(1); + CountDownLatch abortLatch = new CountDownLatch(1); + parameters.setLocalDataHandler(new DumpVisitorDataHandler() { + @Override public void onDocument(Document doc, long timeStamp) { + received.add(doc); + abortLatch.countDown(); + try { + visitLatch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override public void onRemove(DocumentId id) { throw new IllegalStateException("Not supposed to get here"); } + }); + VisitorSession visit = access.createVisitorSession(parameters); + abortLatch.await(); + control.abort(); + visitLatch.countDown(); + visit.waitUntilDone(0); + assertSame(VisitorControlHandler.CompletionCode.ABORTED, + control.getResult().getCode()); + assertEquals(List.of(new Document(musicType, doc1.getId())), + received); + } + +} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java deleted file mode 100644 index 252bf739951..00000000000 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/test/LocalDocumentApiTestCase.java +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.local.test; - -import com.yahoo.document.*; -import com.yahoo.documentapi.*; -import com.yahoo.documentapi.local.LocalDocumentAccess; -import com.yahoo.documentapi.test.AbstractDocumentApiTestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.junit.Assert.*; - -/** - * Runs the superclass tests on this implementation - * - * @author bratseth - */ -public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { - - protected DocumentAccess access; - - @Override - protected DocumentAccess access() { - return access; - } - - @Before - public void setUp() { - DocumentAccessParams params = new DocumentAccessParams(); - params.setDocumentManagerConfigId("file:src/test/cfg/documentmanager.cfg"); - access = new LocalDocumentAccess(params); - } - - @After - public void shutdownAccess() { - access.shutdown(); - } - - @Test - public void testNoExceptionFromAsync() { - AsyncSession session = access.createAsyncSession(new AsyncParameters()); - - DocumentType type = access.getDocumentTypeManager().getDocumentType("music"); - DocumentUpdate docUp = new DocumentUpdate(type, new DocumentId("id:ns:music::2")); - - Result result = session.update(docUp); - assertTrue(result.isSuccess()); - Response response = session.getNext(); - assertEquals(result.getRequestId(), response.getRequestId()); - assertFalse(response.isSuccess()); - session.destroy(); - } - - @Test - public void testAsyncFetch() { - AsyncSession session = access.createAsyncSession(new AsyncParameters()); - List<DocumentId> ids = new ArrayList<>(); - ids.add(new DocumentId("id:music:music::1")); - ids.add(new DocumentId("id:music:music::2")); - ids.add(new DocumentId("id:music:music::3")); - for (DocumentId id : ids) - session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); - int timeout = 100; - - long startTime = System.currentTimeMillis(); - Set<Long> outstandingRequests = new HashSet<>(); - for (DocumentId id : ids) { - Result result = session.get(id); - if ( ! result.isSuccess()) - throw new IllegalStateException("Failed requesting document " + id, result.getError().getCause()); - outstandingRequests.add(result.getRequestId()); - } - - List<Document> documents = new ArrayList<>(); - try { - while ( ! outstandingRequests.isEmpty()) { - int timeSinceStart = (int)(System.currentTimeMillis() - startTime); - Response response = session.getNext(timeout - timeSinceStart); - if (response == null) - throw new RuntimeException("Timed out waiting for documents"); // or return what you have - if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore - - if (response.isSuccess()) - documents.add(((DocumentResponse)response).getDocument()); - outstandingRequests.remove(response.getRequestId()); - } - } - catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for documents", e); - } - - assertEquals(3, documents.size()); - for (Document document : documents) - assertNotNull(document); - } - -} |