summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java10
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java10
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java10
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java42
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java34
5 files changed, 68 insertions, 38 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java
index 59efcd3df3a..74629018e55 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java
@@ -29,16 +29,6 @@ public class GetBucketListMessage extends DocumentMessage {
}
@Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return bucketId.getRawId();
- }
-
- @Override
public int getApproxSize() {
return super.getApproxSize() + 8;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java
index 38ea0a02893..a7c3e400e4c 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java
@@ -77,16 +77,6 @@ public class GetDocumentMessage extends DocumentMessage {
}
@Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return Arrays.hashCode(documentId.getGlobalId());
- }
-
- @Override
public int getType() {
return DocumentProtocol.MESSAGE_GETDOCUMENT;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java
index ed08c23504e..9b2176d85f6 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java
@@ -44,16 +44,6 @@ public class StatBucketMessage extends DocumentMessage {
}
@Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return bucketId.getRawId();
- }
-
- @Override
public int getType() {
return DocumentProtocol.MESSAGE_STATBUCKET;
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java
new file mode 100644
index 00000000000..28abd7b0277
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java
@@ -0,0 +1,42 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.document.BucketId;
+import com.yahoo.document.DocumentId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+/**
+ * @author vekterli
+ */
+public class MessageSequencingTest {
+
+ /*
+ * Sequencing read-only operations artificially limits parallelization of such operations.
+ * We do not violate linearizability by not sequencing, as it only assumes that a write
+ * will become visible at some "atomic" point between sending the write and receiving an
+ * ACK for it. I.e. if we have not received an ACK, we cannot guarantee operation visibility
+ * either way. Sending off a read just after sending a write inherently does not satisfy
+ * this requirement for visibility.
+ */
+
+ @Test
+ public void get_document_message_is_not_sequenced() {
+ GetDocumentMessage message = new GetDocumentMessage(new DocumentId("id:foo:bar::baz"));
+ assertFalse(message.hasSequenceId());
+ }
+
+ @Test
+ public void stat_bucket_message_is_not_sequenced() {
+ StatBucketMessage message = new StatBucketMessage(new BucketId(16, 1), "");
+ assertFalse(message.hasSequenceId());
+ }
+
+ @Test
+ public void get_bucket_list_message_is_not_sequenced() {
+ GetBucketListMessage message = new GetBucketListMessage(new BucketId(16, 1));
+ assertFalse(message.hasSequenceId());
+ }
+
+}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java
index 672cd3a1573..ed862b75828 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java
@@ -3,15 +3,16 @@ package com.yahoo.documentapi.test;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentRemove;
import com.yahoo.document.DocumentType;
import com.yahoo.documentapi.*;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -70,40 +71,45 @@ public abstract class AbstractDocumentApiTestCase {
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new Response(result.getRequestId()));
+ List<Response> responses = new ArrayList<>();
+ waitForAcks(session, 2, responses);
+
result = session.get(new DocumentId("doc:music:1"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new DocumentResponse(result.getRequestId(), doc1));
result = session.get(new DocumentId("doc:music:2"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new DocumentResponse(result.getRequestId(), doc2));
+ // These Gets shall observe the ACKed Puts sent for the same document IDs.
+ waitForAcks(session, 2, responses);
result = session.remove(new DocumentId("doc:music:1"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new Response(result.getRequestId()));
+ waitForAcks(session, 1, responses);
+
result = session.get(new DocumentId("doc:music:1"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new DocumentResponse(result.getRequestId()));
result = session.get(new DocumentId("doc:music:2"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new DocumentResponse(result.getRequestId(), doc2));
+ waitForAcks(session, 2, responses);
result = session.remove(new DocumentId("doc:music:2"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new Response(result.getRequestId()));
+ waitForAcks(session, 1, responses);
+
result = session.get(new DocumentId("doc:music:1"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new DocumentResponse(result.getRequestId()));
result = session.get(new DocumentId("doc:music:2"));
assertTrue(result.isSuccess());
results.put(result.getRequestId(), new DocumentResponse(result.getRequestId()));
+ waitForAcks(session, 2, responses);
- for (int i = 0; i < 4; i++) {
- Response response;
- if (i % 2 == 0) {
- response = pollNext(session);
- } else {
- response = session.getNext(10000);
- }
+ for (Response response : responses) {
assertTrue(response.isSuccess());
assertEquals(results.get(response.getRequestId()), response);
}
@@ -123,6 +129,18 @@ public abstract class AbstractDocumentApiTestCase {
session.destroy();
}
+ private static void waitForAcks(AsyncSession session, int n, List<Response> responsesOut) throws InterruptedException {
+ for (int i = 0; i < n; ++i) {
+ Response response;
+ if (i % 2 == 0) {
+ response = pollNext(session);
+ } else {
+ response = session.getNext(60000);
+ }
+ responsesOut.add(response);
+ }
+ }
+
private static Response pollNext(AsyncSession session) throws InterruptedException {
for (int i = 0; i < 600; ++i) {
Response response = session.getNext();