diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2016-07-28 12:35:47 +0200 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2016-07-28 12:35:47 +0200 |
commit | 893bb85f4517c3c8ea6c6b00315eb76df2f4e949 (patch) | |
tree | 260efd6b619a91bece5d5d48971d36da6288bd89 /documentapi | |
parent | f189b3d79f02347006db5753d00f4ad0bbf2208c (diff) |
Let VisitorDataQueue handle correct wire message types from visitors
Also add tests for basic (de)queuing functionality, since none existed
for this before.
Diffstat (limited to 'documentapi')
3 files changed, 161 insertions, 2 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java index 4cee27a9fda..ea261622bd9 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataHandler.java @@ -85,8 +85,9 @@ public abstract class VisitorDataHandler { /** * Called when a data message is received. * + * IMPORTANT: * May be called concurrently from multiple threads. Any internal state - * mutations must be done in a thread-safe manner. + * mutations MUST be done in a thread-safe manner. * * @param m The message received * @param token A token to reply with when finished processing the message. diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java index 5e65ee534ba..993638bb4b8 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java @@ -2,9 +2,13 @@ package com.yahoo.documentapi; import com.yahoo.document.BucketId; +import com.yahoo.document.DocumentOperation; import com.yahoo.documentapi.messagebus.protocol.DocumentListEntry; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.messagebus.Message; import com.yahoo.vdslib.DocumentList; +import com.yahoo.vdslib.Entry; import java.util.LinkedList; import java.util.List; @@ -30,6 +34,7 @@ public class VisitorDataQueue extends VisitorDataHandler { } // Inherit doc from VisitorDataHandler + @Override public void reset() { super.reset(); synchronized (pendingResponses) { @@ -37,10 +42,32 @@ public class VisitorDataQueue extends VisitorDataHandler { } } + private void appendSingleOpToPendingList(final DocumentOperation op, final AckToken token) { + final DocumentList docList = DocumentList.create(Entry.create(op)); + final DocumentListVisitorResponse response = new DocumentListVisitorResponse(docList, token); + synchronized (pendingResponses) { + pendingResponses.add(response); + pendingResponses.notifyAll(); + } + } + + @Override public void onMessage(Message m, AckToken token) { + if (m instanceof PutDocumentMessage) { + appendSingleOpToPendingList(((PutDocumentMessage)m).getDocumentPut(), token); + } else if (m instanceof RemoveDocumentMessage) { + appendSingleOpToPendingList(((RemoveDocumentMessage)m).getDocumentRemove(), token); + } else { + throw new UnsupportedOperationException( + String.format("Expected put/remove message, got '%s' of type %s", + m.toString(), m.getClass().toString())); + } } - // Inherit doc from VisitorDataHandler + /** + * @deprecated This method is no longer called by the visitor subsystem. See onMessage instead. + */ + @Deprecated public void onDocuments(DocumentList docs, AckToken token) { synchronized (pendingResponses) { pendingResponses.add(new DocumentListVisitorResponse(docs, token)); @@ -49,6 +76,7 @@ public class VisitorDataQueue extends VisitorDataHandler { } // Inherit doc from VisitorDataHandler + @Override public VisitorResponse getNext() { synchronized (pendingResponses) { return (pendingResponses.isEmpty() @@ -57,6 +85,7 @@ public class VisitorDataQueue extends VisitorDataHandler { } // Inherit doc from VisitorDataHandler + @Override public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException { synchronized (pendingResponses) { if (pendingResponses.isEmpty()) { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java b/documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java new file mode 100644 index 00000000000..6b2533cc488 --- /dev/null +++ b/documentapi/src/test/java/com/yahoo/documentapi/VisitorDataQueueTest.java @@ -0,0 +1,129 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentTypeManagerConfigurer; +import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.vdslib.Entry; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.hamcrest.core.IsNull.nullValue; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class VisitorDataQueueTest { + + private final DocumentTypeManager docMan = new DocumentTypeManager(); + + public VisitorDataQueueTest() { + DocumentTypeManagerConfigurer.configure(docMan, "file:./test/cfg/testdoc.cfg"); + } + + private PutDocumentMessage createPutMessage(final String docId) { + return new PutDocumentMessage(new DocumentPut(new Document(docMan.getDocumentType("testdoc"), docId))); + } + + private RemoveDocumentMessage createRemoveMessage(final String docId) { + return new RemoveDocumentMessage(new DocumentId(docId)); + } + + private AckToken createDummyAckToken() { + return new AckToken(new Object()); + } + + private void assertNonNullDocumentListResponse(final VisitorResponse response) { + assertThat(response, notNullValue()); + assertThat(response, instanceOf(DocumentListVisitorResponse.class)); + } + + private void assertResponseHasSinglePut(final VisitorResponse response, final DocumentPut expectedInstance) { + assertNonNullDocumentListResponse(response); + final DocumentListVisitorResponse visitorResponse = (DocumentListVisitorResponse)response; + assertThat(visitorResponse.getDocumentList().size(), equalTo(1)); + final Entry entry = visitorResponse.getDocumentList().get(0); + assertThat(entry.getDocumentOperation(), is(expectedInstance)); + } + + @Test + public void received_put_can_be_polled_via_non_timeout_getter() { + final VisitorDataQueue queue = new VisitorDataQueue(); + final PutDocumentMessage putMessage = createPutMessage("id:foo:testdoc::foo"); + queue.onMessage(putMessage, createDummyAckToken()); + final VisitorResponse response = queue.getNext(); + + assertResponseHasSinglePut(response, putMessage.getDocumentPut()); + assertThat(queue.getNext(), nullValue()); // Queue now empty + } + + @Test + public void received_put_can_be_polled_via_timeout_getter() throws InterruptedException { + final VisitorDataQueue queue = new VisitorDataQueue(); + final PutDocumentMessage putMessage = createPutMessage("id:foo:testdoc::foo"); + queue.onMessage(putMessage, createDummyAckToken()); + final VisitorResponse response = queue.getNext(1000); + + assertResponseHasSinglePut(response, putMessage.getDocumentPut()); + assertThat(queue.getNext(), nullValue()); // Queue now empty + } + + private void assertResponseHasSingleRemove(final VisitorResponse response, final String docId) { + assertNonNullDocumentListResponse(response); + final DocumentListVisitorResponse visitorResponse = (DocumentListVisitorResponse)response; + assertThat(visitorResponse.getDocumentList().size(), equalTo(1)); + final Entry entry = visitorResponse.getDocumentList().get(0); + assertThat(entry.isRemoveEntry(), is(true)); + assertThat(entry.getDocumentOperation(), instanceOf(DocumentRemove.class)); + assertThat(entry.getDocumentOperation().getId(), equalTo(new DocumentId(docId))); + } + + @Test + public void received_remove_can_be_polled_via_non_timeout_getter() { + final VisitorDataQueue queue = new VisitorDataQueue(); + queue.onMessage(createRemoveMessage("id:foo:testdoc::bar"), createDummyAckToken()); + final VisitorResponse response = queue.getNext(); + + assertResponseHasSingleRemove(response, "id:foo:testdoc::bar"); + } + + @Test + public void received_remove_can_be_polled_via_non_getter() throws InterruptedException { + final VisitorDataQueue queue = new VisitorDataQueue(); + queue.onMessage(createRemoveMessage("id:foo:testdoc::bar"), createDummyAckToken()); + final VisitorResponse response = queue.getNext(1000); + + assertResponseHasSingleRemove(response, "id:foo:testdoc::bar"); + } + + @Test + public void multiple_messages_are_enqueued_and_dequeued_in_fifo_order() { + final VisitorDataQueue queue = new VisitorDataQueue(); + final PutDocumentMessage firstPut = createPutMessage("id:foo:testdoc::foo"); + final PutDocumentMessage secondPut = createPutMessage("id:foo:testdoc::baz"); + + queue.onMessage(firstPut, createDummyAckToken()); + queue.onMessage(createRemoveMessage("id:foo:testdoc::bar"), createDummyAckToken()); + queue.onMessage(secondPut, createDummyAckToken()); + queue.onMessage(createRemoveMessage("id:foo:testdoc::fleeb"), createDummyAckToken()); + + assertResponseHasSinglePut(queue.getNext(), firstPut.getDocumentPut()); + assertResponseHasSingleRemove(queue.getNext(), "id:foo:testdoc::bar"); + assertResponseHasSinglePut(queue.getNext(), secondPut.getDocumentPut()); + assertResponseHasSingleRemove(queue.getNext(), "id:foo:testdoc::fleeb"); + } + + @Test(expected = UnsupportedOperationException.class) + public void unknown_message_throws_unsupported_operation_exception() { + final VisitorDataQueue queue = new VisitorDataQueue(); + queue.onMessage(new GetDocumentMessage(new DocumentId("id:foo:testdoc::bar")), createDummyAckToken()); + } + +} |