summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-01-16 15:26:25 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-01-16 15:26:25 +0000
commitb9b0cf87f7335fed7af9854a2f1a63617c29451a (patch)
tree4f02618e0007c1addcb902d0f711b3f3f5870391 /storageapi
parentda4d428fe674978239d816e4da768e592c188dbc (diff)
Add internal read consistency enum to storage protocol Get requests
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp22
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto5
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp27
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.cpp3
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.h8
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp21
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h14
7 files changed, 90 insertions, 10 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index dbd79e4fcca..2f959e40e2a 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -295,6 +295,28 @@ TEST_P(StorageProtocolTest, get) {
EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
}
+TEST_P(StorageProtocolTest, get_internal_read_consistency_is_strong_by_default) {
+ auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
+ EXPECT_EQ(cmd->internal_read_consistency(), InternalReadConsistency::Strong);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Strong);
+}
+
+TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) {
+ // Only supported on protocol version 7+. Will default to Strong on older versions, which is what we want.
+ if (GetParam().getMajor() < 7) {
+ return;
+ }
+ auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
+ cmd->set_internal_read_consistency(InternalReadConsistency::Weak);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Weak);
+
+ cmd->set_internal_read_consistency(InternalReadConsistency::Strong);
+ cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Strong);
+}
+
TEST_P(StorageProtocolTest, remove) {
auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159);
auto cmd2 = copyCommand(cmd);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index 58da24df836..810f88f588f 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -61,6 +61,11 @@ message GetRequest {
bytes document_id = 2;
bytes field_set = 3;
uint64 before_timestamp = 4;
+ enum InternalReadConsistency {
+ Strong = 0; // Default for a good reason.
+ Weak = 1;
+ }
+ InternalReadConsistency internal_read_consistency = 5;
}
message GetResponse {
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index bf56dd56db6..ea002ab98ed 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -530,6 +530,26 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cm
// Get
// -----------------------------------------------------------------
+namespace {
+
+protobuf::GetRequest_InternalReadConsistency read_consistency_to_protobuf(api::InternalReadConsistency consistency) {
+ switch (consistency) {
+ case api::InternalReadConsistency::Strong: return protobuf::GetRequest_InternalReadConsistency_Strong;
+ case api::InternalReadConsistency::Weak: return protobuf::GetRequest_InternalReadConsistency_Weak;
+ default: return protobuf::GetRequest_InternalReadConsistency_Strong;
+ }
+}
+
+api::InternalReadConsistency read_consistency_from_protobuf(protobuf::GetRequest_InternalReadConsistency consistency) {
+ switch (consistency) {
+ case protobuf::GetRequest_InternalReadConsistency_Strong: return api::InternalReadConsistency::Strong;
+ case protobuf::GetRequest_InternalReadConsistency_Weak: return api::InternalReadConsistency::Weak;
+ default: return api::InternalReadConsistency::Strong;
+ }
+}
+
+}
+
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const {
encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) {
auto doc_id = msg.getDocumentId().toString();
@@ -538,6 +558,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) co
if (!msg.getFieldSet().empty()) {
req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
}
+ req.set_internal_read_consistency(read_consistency_to_protobuf(msg.internal_read_consistency()));
});
}
@@ -553,8 +574,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) cons
api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) {
document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
- return std::make_unique<api::GetCommand>(bucket, std::move(doc_id),
- req.field_set(), req.before_timestamp());
+ auto op = std::make_unique<api::GetCommand>(bucket, std::move(doc_id),
+ req.field_set(), req.before_timestamp());
+ op->set_internal_read_consistency(read_consistency_from_protobuf(req.internal_read_consistency()));
+ return op;
});
}
diff --git a/storageapi/src/vespa/storageapi/message/persistence.cpp b/storageapi/src/vespa/storageapi/message/persistence.cpp
index 54df92546a5..1463f42abeb 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.cpp
+++ b/storageapi/src/vespa/storageapi/message/persistence.cpp
@@ -180,7 +180,8 @@ GetCommand::GetCommand(const document::Bucket &bucket, const document::DocumentI
: BucketInfoCommand(MessageType::GET, bucket),
_docId(docId),
_beforeTimestamp(before),
- _fieldSet(fieldSet)
+ _fieldSet(fieldSet),
+ _internal_read_consistency(InternalReadConsistency::Strong)
{
}
diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h
index f2161421feb..e6fe2b6dae5 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.h
+++ b/storageapi/src/vespa/storageapi/message/persistence.h
@@ -184,7 +184,7 @@ class GetCommand : public BucketInfoCommand {
document::DocumentId _docId;
Timestamp _beforeTimestamp;
vespalib::string _fieldSet;
-
+ InternalReadConsistency _internal_read_consistency;
public:
GetCommand(const document::Bucket &bucket, const document::DocumentId&,
vespalib::stringref fieldSet, Timestamp before = MAX_TIMESTAMP);
@@ -194,6 +194,12 @@ public:
Timestamp getBeforeTimestamp() const { return _beforeTimestamp; }
const vespalib::string& getFieldSet() const { return _fieldSet; }
void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; }
+ InternalReadConsistency internal_read_consistency() const noexcept {
+ return _internal_read_consistency;
+ }
+ void set_internal_read_consistency(InternalReadConsistency consistency) noexcept {
+ _internal_read_consistency = consistency;
+ }
vespalib::string getSummary() const override;
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
index 40422ce06c4..0e871720ad0 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -302,12 +302,10 @@ StorageMessage::getSummary() const {
const char* to_string(LockingRequirements req) noexcept {
switch (req) {
- case LockingRequirements::Exclusive:
- return "Exclusive";
- case LockingRequirements::Shared:
- return "Shared";
+ case LockingRequirements::Exclusive: return "Exclusive";
+ case LockingRequirements::Shared: return "Shared";
+ default: abort();
}
- assert(false);
}
std::ostream& operator<<(std::ostream& os, LockingRequirements req) {
@@ -315,4 +313,17 @@ std::ostream& operator<<(std::ostream& os, LockingRequirements req) {
return os;
}
+const char* to_string(InternalReadConsistency consistency) noexcept {
+ switch (consistency) {
+ case InternalReadConsistency::Strong: return "Strong";
+ case InternalReadConsistency::Weak: return "Weak";
+ default: abort();
+ }
+}
+
+std::ostream& operator<<(std::ostream& os, InternalReadConsistency consistency) {
+ os << to_string(consistency);
+ return os;
+}
+
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index e119884bd1f..ffbc24cd724 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -316,9 +316,21 @@ enum class LockingRequirements : uint8_t {
};
const char* to_string(LockingRequirements req) noexcept;
-
std::ostream& operator<<(std::ostream&, LockingRequirements);
+// This mirrors spi::ReadConsistency and has the same semantics, but is
+// decoupled to avoid extra cross-module dependencies.
+// Note that the name _internal_ read consistency is intentional to lessen
+// any ambiguities on whether this is consistency in a distributed systems
+// setting (i.e. linearizability) on internally in the persistence provider.
+enum class InternalReadConsistency : uint8_t {
+ Strong = 0,
+ Weak
+};
+
+const char* to_string(InternalReadConsistency consistency) noexcept;
+std::ostream& operator<<(std::ostream&, InternalReadConsistency);
+
class StorageMessage : public vespalib::Printable
{
friend class StorageMessageTest; // Used for testing only