aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2016-07-28 12:35:47 +0200
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2016-07-28 12:35:47 +0200
commit893bb85f4517c3c8ea6c6b00315eb76df2f4e949 (patch)
tree260efd6b619a91bece5d5d48971d36da6288bd89 /documentapi
parentf189b3d79f02347006db5753d00f4ad0bbf2208c (diff)
Let VisitorDataQueue handle correct wire message types from visitors
Also add tests for basic (de)queuing functionality, since none existed for this before.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java3
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java31
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java129
3 files changed, 161 insertions, 2 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java
index 4cee27a9fda..ea261622bd9 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java
@@ -85,8 +85,9 @@ public abstract class VisitorDataHandler {
/**
* Called when a data message is received.
*
+ * IMPORTANT:
* May be called concurrently from multiple threads. Any internal state
- * mutations must be done in a thread-safe manner.
+ * mutations MUST be done in a thread-safe manner.
*
* @param m The message received
* @param token A token to reply with when finished processing the message.
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java
index 5e65ee534ba..993638bb4b8 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java
@@ -2,9 +2,13 @@
package com.yahoo.documentapi;
import com.yahoo.document.BucketId;
+import com.yahoo.document.DocumentOperation;
import com.yahoo.documentapi.messagebus.protocol.DocumentListEntry;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.messagebus.Message;
import com.yahoo.vdslib.DocumentList;
+import com.yahoo.vdslib.Entry;
import java.util.LinkedList;
import java.util.List;
@@ -30,6 +34,7 @@ public class VisitorDataQueue extends VisitorDataHandler {
}
// Inherit doc from VisitorDataHandler
+ @Override
public void reset() {
super.reset();
synchronized (pendingResponses) {
@@ -37,10 +42,32 @@ public class VisitorDataQueue extends VisitorDataHandler {
}
}
+ private void appendSingleOpToPendingList(final DocumentOperation op, final AckToken token) {
+ final DocumentList docList = DocumentList.create(Entry.create(op));
+ final DocumentListVisitorResponse response = new DocumentListVisitorResponse(docList, token);
+ synchronized (pendingResponses) {
+ pendingResponses.add(response);
+ pendingResponses.notifyAll();
+ }
+ }
+
+ @Override
public void onMessage(Message m, AckToken token) {
+ if (m instanceof PutDocumentMessage) {
+ appendSingleOpToPendingList(((PutDocumentMessage)m).getDocumentPut(), token);
+ } else if (m instanceof RemoveDocumentMessage) {
+ appendSingleOpToPendingList(((RemoveDocumentMessage)m).getDocumentRemove(), token);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Expected put/remove message, got '%s' of type %s",
+ m.toString(), m.getClass().toString()));
+ }
}
- // Inherit doc from VisitorDataHandler
+ /**
+ * @deprecated This method is no longer called by the visitor subsystem. See onMessage instead.
+ */
+ @Deprecated
public void onDocuments(DocumentList docs, AckToken token) {
synchronized (pendingResponses) {
pendingResponses.add(new DocumentListVisitorResponse(docs, token));
@@ -49,6 +76,7 @@ public class VisitorDataQueue extends VisitorDataHandler {
}
// Inherit doc from VisitorDataHandler
+ @Override
public VisitorResponse getNext() {
synchronized (pendingResponses) {
return (pendingResponses.isEmpty()
@@ -57,6 +85,7 @@ public class VisitorDataQueue extends VisitorDataHandler {
}
// Inherit doc from VisitorDataHandler
+ @Override
public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException {
synchronized (pendingResponses) {
if (pendingResponses.isEmpty()) {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java
new file mode 100644
index 00000000000..6b2533cc488
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java
@@ -0,0 +1,129 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentTypeManagerConfigurer;
+import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.vdslib.Entry;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class VisitorDataQueueTest {
+
+ private final DocumentTypeManager docMan = new DocumentTypeManager();
+
+ public VisitorDataQueueTest() {
+ DocumentTypeManagerConfigurer.configure(docMan, "file:./test/cfg/testdoc.cfg");
+ }
+
+ private PutDocumentMessage createPutMessage(final String docId) {
+ return new PutDocumentMessage(new DocumentPut(new Document(docMan.getDocumentType("testdoc"), docId)));
+ }
+
+ private RemoveDocumentMessage createRemoveMessage(final String docId) {
+ return new RemoveDocumentMessage(new DocumentId(docId));
+ }
+
+ private AckToken createDummyAckToken() {
+ return new AckToken(new Object());
+ }
+
+ private void assertNonNullDocumentListResponse(final VisitorResponse response) {
+ assertThat(response, notNullValue());
+ assertThat(response, instanceOf(DocumentListVisitorResponse.class));
+ }
+
+ private void assertResponseHasSinglePut(final VisitorResponse response, final DocumentPut expectedInstance) {
+ assertNonNullDocumentListResponse(response);
+ final DocumentListVisitorResponse visitorResponse = (DocumentListVisitorResponse)response;
+ assertThat(visitorResponse.getDocumentList().size(), equalTo(1));
+ final Entry entry = visitorResponse.getDocumentList().get(0);
+ assertThat(entry.getDocumentOperation(), is(expectedInstance));
+ }
+
+ @Test
+ public void received_put_can_be_polled_via_non_timeout_getter() {
+ final VisitorDataQueue queue = new VisitorDataQueue();
+ final PutDocumentMessage putMessage = createPutMessage("id:foo:testdoc::foo");
+ queue.onMessage(putMessage, createDummyAckToken());
+ final VisitorResponse response = queue.getNext();
+
+ assertResponseHasSinglePut(response, putMessage.getDocumentPut());
+ assertThat(queue.getNext(), nullValue()); // Queue now empty
+ }
+
+ @Test
+ public void received_put_can_be_polled_via_timeout_getter() throws InterruptedException {
+ final VisitorDataQueue queue = new VisitorDataQueue();
+ final PutDocumentMessage putMessage = createPutMessage("id:foo:testdoc::foo");
+ queue.onMessage(putMessage, createDummyAckToken());
+ final VisitorResponse response = queue.getNext(1000);
+
+ assertResponseHasSinglePut(response, putMessage.getDocumentPut());
+ assertThat(queue.getNext(), nullValue()); // Queue now empty
+ }
+
+ private void assertResponseHasSingleRemove(final VisitorResponse response, final String docId) {
+ assertNonNullDocumentListResponse(response);
+ final DocumentListVisitorResponse visitorResponse = (DocumentListVisitorResponse)response;
+ assertThat(visitorResponse.getDocumentList().size(), equalTo(1));
+ final Entry entry = visitorResponse.getDocumentList().get(0);
+ assertThat(entry.isRemoveEntry(), is(true));
+ assertThat(entry.getDocumentOperation(), instanceOf(DocumentRemove.class));
+ assertThat(entry.getDocumentOperation().getId(), equalTo(new DocumentId(docId)));
+ }
+
+ @Test
+ public void received_remove_can_be_polled_via_non_timeout_getter() {
+ final VisitorDataQueue queue = new VisitorDataQueue();
+ queue.onMessage(createRemoveMessage("id:foo:testdoc::bar"), createDummyAckToken());
+ final VisitorResponse response = queue.getNext();
+
+ assertResponseHasSingleRemove(response, "id:foo:testdoc::bar");
+ }
+
+ @Test
+ public void received_remove_can_be_polled_via_non_getter() throws InterruptedException {
+ final VisitorDataQueue queue = new VisitorDataQueue();
+ queue.onMessage(createRemoveMessage("id:foo:testdoc::bar"), createDummyAckToken());
+ final VisitorResponse response = queue.getNext(1000);
+
+ assertResponseHasSingleRemove(response, "id:foo:testdoc::bar");
+ }
+
+ @Test
+ public void multiple_messages_are_enqueued_and_dequeued_in_fifo_order() {
+ final VisitorDataQueue queue = new VisitorDataQueue();
+ final PutDocumentMessage firstPut = createPutMessage("id:foo:testdoc::foo");
+ final PutDocumentMessage secondPut = createPutMessage("id:foo:testdoc::baz");
+
+ queue.onMessage(firstPut, createDummyAckToken());
+ queue.onMessage(createRemoveMessage("id:foo:testdoc::bar"), createDummyAckToken());
+ queue.onMessage(secondPut, createDummyAckToken());
+ queue.onMessage(createRemoveMessage("id:foo:testdoc::fleeb"), createDummyAckToken());
+
+ assertResponseHasSinglePut(queue.getNext(), firstPut.getDocumentPut());
+ assertResponseHasSingleRemove(queue.getNext(), "id:foo:testdoc::bar");
+ assertResponseHasSinglePut(queue.getNext(), secondPut.getDocumentPut());
+ assertResponseHasSingleRemove(queue.getNext(), "id:foo:testdoc::fleeb");
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void unknown_message_throws_unsupported_operation_exception() {
+ final VisitorDataQueue queue = new VisitorDataQueue();
+ queue.onMessage(new GetDocumentMessage(new DocumentId("id:foo:testdoc::bar")), createDummyAckToken());
+ }
+
+}