diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-01-16 15:26:25 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-01-16 15:26:25 +0000 |
commit | b9b0cf87f7335fed7af9854a2f1a63617c29451a (patch) | |
tree | 4f02618e0007c1addcb902d0f711b3f3f5870391 /storageapi | |
parent | da4d428fe674978239d816e4da768e592c188dbc (diff) |
Add internal read consistency enum to storage protocol Get requests
Diffstat (limited to 'storageapi')
7 files changed, 90 insertions, 10 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index dbd79e4fcca..2f959e40e2a 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -295,6 +295,28 @@ TEST_P(StorageProtocolTest, get) { EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } +TEST_P(StorageProtocolTest, get_internal_read_consistency_is_strong_by_default) { + auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); + EXPECT_EQ(cmd->internal_read_consistency(), InternalReadConsistency::Strong); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Strong); +} + +TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) { + // Only supported on protocol version 7+. Will default to Strong on older versions, which is what we want. + if (GetParam().getMajor() < 7) { + return; + } + auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); + cmd->set_internal_read_consistency(InternalReadConsistency::Weak); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Weak); + + cmd->set_internal_read_consistency(InternalReadConsistency::Strong); + cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Strong); +} + TEST_P(StorageProtocolTest, remove) { auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159); auto cmd2 = copyCommand(cmd); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto index 58da24df836..810f88f588f 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -61,6 +61,11 @@ message GetRequest { bytes document_id = 2; bytes field_set = 3; uint64 before_timestamp = 4; + enum InternalReadConsistency { + Strong = 0; // Default for a good reason. + Weak = 1; + } + InternalReadConsistency internal_read_consistency = 5; } message GetResponse { diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index bf56dd56db6..ea002ab98ed 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -530,6 +530,26 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cm // Get // ----------------------------------------------------------------- +namespace { + +protobuf::GetRequest_InternalReadConsistency read_consistency_to_protobuf(api::InternalReadConsistency consistency) { + switch (consistency) { + case api::InternalReadConsistency::Strong: return protobuf::GetRequest_InternalReadConsistency_Strong; + case api::InternalReadConsistency::Weak: return protobuf::GetRequest_InternalReadConsistency_Weak; + default: return protobuf::GetRequest_InternalReadConsistency_Strong; + } +} + +api::InternalReadConsistency read_consistency_from_protobuf(protobuf::GetRequest_InternalReadConsistency consistency) { + switch (consistency) { + case protobuf::GetRequest_InternalReadConsistency_Strong: return api::InternalReadConsistency::Strong; + case protobuf::GetRequest_InternalReadConsistency_Weak: return api::InternalReadConsistency::Weak; + default: return api::InternalReadConsistency::Strong; + } +} + +} + void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const { encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) { auto doc_id = msg.getDocumentId().toString(); @@ -538,6 +558,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) co if (!msg.getFieldSet().empty()) { req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); } + req.set_internal_read_consistency(read_consistency_to_protobuf(msg.internal_read_consistency())); }); } @@ -553,8 +574,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) cons api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const { return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) { document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); - return std::make_unique<api::GetCommand>(bucket, std::move(doc_id), - req.field_set(), req.before_timestamp()); + auto op = std::make_unique<api::GetCommand>(bucket, std::move(doc_id), + req.field_set(), req.before_timestamp()); + op->set_internal_read_consistency(read_consistency_from_protobuf(req.internal_read_consistency())); + return op; }); } diff --git a/storageapi/src/vespa/storageapi/message/persistence.cpp b/storageapi/src/vespa/storageapi/message/persistence.cpp index 54df92546a5..1463f42abeb 100644 --- a/storageapi/src/vespa/storageapi/message/persistence.cpp +++ b/storageapi/src/vespa/storageapi/message/persistence.cpp @@ -180,7 +180,8 @@ GetCommand::GetCommand(const document::Bucket &bucket, const document::DocumentI : BucketInfoCommand(MessageType::GET, bucket), _docId(docId), _beforeTimestamp(before), - _fieldSet(fieldSet) + _fieldSet(fieldSet), + _internal_read_consistency(InternalReadConsistency::Strong) { } diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h index f2161421feb..e6fe2b6dae5 100644 --- a/storageapi/src/vespa/storageapi/message/persistence.h +++ b/storageapi/src/vespa/storageapi/message/persistence.h @@ -184,7 +184,7 @@ class GetCommand : public BucketInfoCommand { document::DocumentId _docId; Timestamp _beforeTimestamp; vespalib::string _fieldSet; - + InternalReadConsistency _internal_read_consistency; public: GetCommand(const document::Bucket &bucket, const document::DocumentId&, vespalib::stringref fieldSet, Timestamp before = MAX_TIMESTAMP); @@ -194,6 +194,12 @@ public: Timestamp getBeforeTimestamp() const { return _beforeTimestamp; } const vespalib::string& getFieldSet() const { return _fieldSet; } void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; } + InternalReadConsistency internal_read_consistency() const noexcept { + return _internal_read_consistency; + } + void set_internal_read_consistency(InternalReadConsistency consistency) noexcept { + _internal_read_consistency = consistency; + } vespalib::string getSummary() const override; void print(std::ostream& out, bool verbose, const std::string& indent) const override; diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index 40422ce06c4..0e871720ad0 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -302,12 +302,10 @@ StorageMessage::getSummary() const { const char* to_string(LockingRequirements req) noexcept { switch (req) { - case LockingRequirements::Exclusive: - return "Exclusive"; - case LockingRequirements::Shared: - return "Shared"; + case LockingRequirements::Exclusive: return "Exclusive"; + case LockingRequirements::Shared: return "Shared"; + default: abort(); } - assert(false); } std::ostream& operator<<(std::ostream& os, LockingRequirements req) { @@ -315,4 +313,17 @@ std::ostream& operator<<(std::ostream& os, LockingRequirements req) { return os; } +const char* to_string(InternalReadConsistency consistency) noexcept { + switch (consistency) { + case InternalReadConsistency::Strong: return "Strong"; + case InternalReadConsistency::Weak: return "Weak"; + default: abort(); + } +} + +std::ostream& operator<<(std::ostream& os, InternalReadConsistency consistency) { + os << to_string(consistency); + return os; +} + } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index e119884bd1f..ffbc24cd724 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -316,9 +316,21 @@ enum class LockingRequirements : uint8_t { }; const char* to_string(LockingRequirements req) noexcept; - std::ostream& operator<<(std::ostream&, LockingRequirements); +// This mirrors spi::ReadConsistency and has the same semantics, but is +// decoupled to avoid extra cross-module dependencies. +// Note that the name _internal_ read consistency is intentional to lessen +// any ambiguities on whether this is consistency in a distributed systems +// setting (i.e. linearizability) on internally in the persistence provider. +enum class InternalReadConsistency : uint8_t { + Strong = 0, + Weak +}; + +const char* to_string(InternalReadConsistency consistency) noexcept; +std::ostream& operator<<(std::ostream&, InternalReadConsistency); + class StorageMessage : public vespalib::Printable { friend class StorageMessageTest; // Used for testing only |