From 4fffbbfe68797a4f62b07fcfc451042130db081d Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 22 Jun 2017 14:30:00 +0200 Subject: Remove MessageBus sequencing for read only operations We cannot guarantee sequencing when operations hit different SourceSessions in the first place, so we should not sacrifice performance and parallelism by chasing an unreachable goal. Clients should only rely on the standard mechanism for write visibility, which is to receive an ACK. Add ACK barriers to AsyncSession test to avoid implicit sequencing requirement. --- .../messagebus/protocol/GetBucketListMessage.java | 10 ------ .../messagebus/protocol/GetDocumentMessage.java | 10 ------ .../messagebus/protocol/StatBucketMessage.java | 10 ------ .../messagebus/protocol/MessageSequencingTest.java | 42 ++++++++++++++++++++++ .../test/AbstractDocumentApiTestCase.java | 34 +++++++++++++----- 5 files changed, 68 insertions(+), 38 deletions(-) create mode 100644 documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java (limited to 'documentapi') diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java index 59efcd3df3a..74629018e55 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetBucketListMessage.java @@ -28,16 +28,6 @@ public class GetBucketListMessage extends DocumentMessage { return new StatBucketReply(); } - @Override - public boolean hasSequenceId() { - return true; - } - - @Override - public long getSequenceId() { - return bucketId.getRawId(); - } - @Override public int getApproxSize() { return super.getApproxSize() + 8; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java index 38ea0a02893..a7c3e400e4c 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/GetDocumentMessage.java @@ -76,16 +76,6 @@ public class GetDocumentMessage extends DocumentMessage { return super.getApproxSize() + 4 + documentId.toString().length(); } - @Override - public boolean hasSequenceId() { - return true; - } - - @Override - public long getSequenceId() { - return Arrays.hashCode(documentId.getGlobalId()); - } - @Override public int getType() { return DocumentProtocol.MESSAGE_GETDOCUMENT; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java index ed08c23504e..9b2176d85f6 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StatBucketMessage.java @@ -43,16 +43,6 @@ public class StatBucketMessage extends DocumentMessage { return super.getApproxSize() + 8 + documentSelection.length(); } - @Override - public boolean hasSequenceId() { - return true; - } - - @Override - public long getSequenceId() { - return bucketId.getRawId(); - } - @Override public int getType() { return DocumentProtocol.MESSAGE_STATBUCKET; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java new file mode 100644 index 00000000000..28abd7b0277 --- /dev/null +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/MessageSequencingTest.java @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.document.BucketId; +import com.yahoo.document.DocumentId; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; + +/** + * @author vekterli + */ +public class MessageSequencingTest { + + /* + * Sequencing read-only operations artificially limits parallelization of such operations. + * We do not violate linearizability by not sequencing, as it only assumes that a write + * will become visible at some "atomic" point between sending the write and receiving an + * ACK for it. I.e. if we have not received an ACK, we cannot guarantee operation visibility + * either way. Sending off a read just after sending a write inherently does not satisfy + * this requirement for visibility. + */ + + @Test + public void get_document_message_is_not_sequenced() { + GetDocumentMessage message = new GetDocumentMessage(new DocumentId("id:foo:bar::baz")); + assertFalse(message.hasSequenceId()); + } + + @Test + public void stat_bucket_message_is_not_sequenced() { + StatBucketMessage message = new StatBucketMessage(new BucketId(16, 1), ""); + assertFalse(message.hasSequenceId()); + } + + @Test + public void get_bucket_list_message_is_not_sequenced() { + GetBucketListMessage message = new GetBucketListMessage(new BucketId(16, 1)); + assertFalse(message.hasSequenceId()); + } + +} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java index 672cd3a1573..ed862b75828 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/test/AbstractDocumentApiTestCase.java @@ -3,15 +3,16 @@ package com.yahoo.documentapi.test; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentRemove; import com.yahoo.document.DocumentType; import com.yahoo.documentapi.*; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -70,40 +71,45 @@ public abstract class AbstractDocumentApiTestCase { assertTrue(result.isSuccess()); results.put(result.getRequestId(), new Response(result.getRequestId())); + List responses = new ArrayList<>(); + waitForAcks(session, 2, responses); + result = session.get(new DocumentId("doc:music:1")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new DocumentResponse(result.getRequestId(), doc1)); result = session.get(new DocumentId("doc:music:2")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new DocumentResponse(result.getRequestId(), doc2)); + // These Gets shall observe the ACKed Puts sent for the same document IDs. + waitForAcks(session, 2, responses); result = session.remove(new DocumentId("doc:music:1")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new Response(result.getRequestId())); + waitForAcks(session, 1, responses); + result = session.get(new DocumentId("doc:music:1")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new DocumentResponse(result.getRequestId())); result = session.get(new DocumentId("doc:music:2")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new DocumentResponse(result.getRequestId(), doc2)); + waitForAcks(session, 2, responses); result = session.remove(new DocumentId("doc:music:2")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new Response(result.getRequestId())); + waitForAcks(session, 1, responses); + result = session.get(new DocumentId("doc:music:1")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new DocumentResponse(result.getRequestId())); result = session.get(new DocumentId("doc:music:2")); assertTrue(result.isSuccess()); results.put(result.getRequestId(), new DocumentResponse(result.getRequestId())); + waitForAcks(session, 2, responses); - for (int i = 0; i < 4; i++) { - Response response; - if (i % 2 == 0) { - response = pollNext(session); - } else { - response = session.getNext(10000); - } + for (Response response : responses) { assertTrue(response.isSuccess()); assertEquals(results.get(response.getRequestId()), response); } @@ -123,6 +129,18 @@ public abstract class AbstractDocumentApiTestCase { session.destroy(); } + private static void waitForAcks(AsyncSession session, int n, List responsesOut) throws InterruptedException { + for (int i = 0; i < n; ++i) { + Response response; + if (i % 2 == 0) { + response = pollNext(session); + } else { + response = session.getNext(60000); + } + responsesOut.add(response); + } + } + private static Response pollNext(AsyncSession session) throws InterruptedException { for (int i = 0; i < 600; ++i) { Response response = session.getNext(); -- cgit v1.2.3