diff options
Diffstat (limited to 'storage/src')
10 files changed, 105 insertions, 81 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index 3a3a3a6b016..ec8d6855abc 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -29,6 +29,8 @@ using namespace ::testing; using std::shared_ptr; using document::BucketSpace; +using document::Bucket; +using document::BucketId; using document::ByteBuffer; using document::Document; using document::DocumentId; @@ -127,7 +129,8 @@ TEST_F(StorageProtocolTest, testAddress50) { address.to_mbus_route().toString()); } -template<typename Command> std::shared_ptr<Command> +template <typename Command> +std::shared_ptr<Command> StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m) { auto mbusMessage = std::make_unique<mbusprot::StorageCommand>(m); @@ -145,7 +148,8 @@ StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m) return std::dynamic_pointer_cast<Command>(internalMessage); } -template<typename Reply> std::shared_ptr<Reply> +template <typename Reply> +std::shared_ptr<Reply> StorageProtocolTest::copyReply(const std::shared_ptr<Reply>& m) { auto mbusMessage = std::make_unique<mbusprot::StorageReply>(m); @@ -222,28 +226,36 @@ TEST_P(StorageProtocolTest, all_zero_bucket_info_is_propagated) { TEST_P(StorageProtocolTest, request_metadata_is_propagated) { auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14); - cmd->forceMsgId(12345); + const auto sender_internal_msg_id = cmd->getMsgId(); cmd->setPriority(50); cmd->setSourceIndex(321); auto cmd2 = copyCommand(cmd); - EXPECT_EQ(12345, cmd2->getMsgId()); + EXPECT_EQ(cmd2->originator_msg_id(), sender_internal_msg_id); EXPECT_EQ(50, cmd2->getPriority()); EXPECT_EQ(321, cmd2->getSourceIndex()); + // The new message should get new _internal_ message ID + EXPECT_NE(cmd2->getMsgId(), sender_internal_msg_id); } TEST_P(StorageProtocolTest, response_metadata_is_propagated) { auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14); + const auto cmd_internal_msg_id = cmd->getMsgId(); auto cmd2 = copyCommand(cmd); - auto reply = std::make_shared<PutReply>(*cmd2); - reply->forceMsgId(1234); + auto reply = std::make_shared<PutReply>(*cmd2); // Transitively inherits originator message ID from cmd reply->setPriority(101); ReturnCode result(ReturnCode::TEST_AND_SET_CONDITION_FAILED, "foo is not bar"); reply->setResult(result); auto reply2 = copyReply(reply); EXPECT_EQ(result, reply2->getResult()); - EXPECT_EQ(1234, reply->getMsgId()); - EXPECT_EQ(101, reply->getPriority()); + // Replies inherit the message ID from the command they are created for. In the current protocol + // implementation we implicitly set the reply's message ID directly from the command associated + // with the send-state, but older versions set it from what arrives over the wire. + // The originator ID is thus not actually used by us, but we set and check it here just to ensure we + // still propagate it back correctly over the wire (in the glorious name of backwards compatibility). + EXPECT_EQ(reply2->getMsgId(), cmd_internal_msg_id); + EXPECT_EQ(reply2->originator_msg_id(), cmd_internal_msg_id); + EXPECT_EQ(reply2->getPriority(), 101); } TEST_P(StorageProtocolTest, update) { @@ -847,25 +859,28 @@ TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storag } TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) { - EXPECT_EQ(72u, sizeof(StorageMessage)); - EXPECT_EQ(88u, sizeof(StorageReply)); - EXPECT_EQ(112u, sizeof(BucketReply)); - EXPECT_EQ(8u, sizeof(document::BucketId)); - EXPECT_EQ(16u, sizeof(document::Bucket)); - EXPECT_EQ(32u, sizeof(BucketInfo)); - EXPECT_EQ(144u, sizeof(BucketInfoReply)); - EXPECT_EQ(288u, sizeof(PutReply)); - EXPECT_EQ(272u, sizeof(UpdateReply)); - EXPECT_EQ(264u, sizeof(RemoveReply)); - EXPECT_EQ(352u, sizeof(GetReply)); - EXPECT_EQ(88u, sizeof(StorageCommand)); - EXPECT_EQ(112u, sizeof(BucketCommand)); - EXPECT_EQ(112u, sizeof(BucketInfoCommand)); - EXPECT_EQ(112u + sizeof(vespalib::string), sizeof(TestAndSetCommand)); - EXPECT_EQ(144u + sizeof(vespalib::string), sizeof(PutCommand)); - EXPECT_EQ(144u + sizeof(vespalib::string), sizeof(UpdateCommand)); - EXPECT_EQ(224u + sizeof(vespalib::string), sizeof(RemoveCommand)); - EXPECT_EQ(296u + sizeof(documentapi::TestAndSetCondition), sizeof(GetCommand)); + constexpr size_t msg_baseline = 80u; + constexpr size_t reply_baseline = 96; + + EXPECT_EQ(sizeof(StorageMessage), msg_baseline); + EXPECT_EQ(sizeof(StorageReply), reply_baseline); + EXPECT_EQ(sizeof(BucketReply), reply_baseline + 24); + EXPECT_EQ(sizeof(BucketId), 8); + EXPECT_EQ(sizeof(Bucket), 16); + EXPECT_EQ(sizeof(BucketInfo), 32); + EXPECT_EQ(sizeof(BucketInfoReply), reply_baseline + 56); + EXPECT_EQ(sizeof(PutReply), reply_baseline + 200); + EXPECT_EQ(sizeof(UpdateReply), reply_baseline + 184); + EXPECT_EQ(sizeof(RemoveReply), reply_baseline + 176); + EXPECT_EQ(sizeof(GetReply), reply_baseline + 264); + EXPECT_EQ(sizeof(StorageCommand), msg_baseline + 16); + EXPECT_EQ(sizeof(BucketCommand), sizeof(StorageCommand) + 24); + EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand)); + EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string)); + EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 32); + EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32); + EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112); + EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184); } } // storage::api diff --git a/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp b/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp index ea59fefc924..28f19c74e3c 100644 --- a/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp +++ b/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp @@ -33,7 +33,7 @@ TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) { hash_of("foo", lib::NodeType::STORAGE, 1)); EXPECT_EQ(16u, sizeof(StorageMessageAddress)); - EXPECT_EQ(72u, sizeof(StorageMessage)); + EXPECT_EQ(80u, sizeof(StorageMessage)); EXPECT_EQ(16u, sizeof(mbus::Trace)); } diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index e494f4e67da..34d8923c6e6 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -290,6 +290,7 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { // TODO the reply wrapper does lazy deserialization. Can we/should we ever defer? auto reply = wrapped_reply->getInternalMessage(); // TODO message stealing assert(reply); + assert(reply->getMsgId() == cmd.getMsgId()); if (!hdr.trace_payload().empty()) { cmd.getTrace().addChild(mbus::TraceNode::decode(hdr.trace_payload())); diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 0ede96179e8..f2dcf07c01a 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -88,7 +88,7 @@ std::shared_ptr<document::Document> get_document(const protobuf::Document& src_d vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size()); return std::make_shared<document::Document>(type_repo, doc_buf); } - return std::shared_ptr<document::Document>(); + return {}; } void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) { @@ -104,12 +104,17 @@ std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src return document::DocumentUpdate::createHEAD( type_repo, vespalib::nbostream(src.payload().data(), src.payload().size())); } - return std::shared_ptr<document::DocumentUpdate>(); + return {}; } void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) { protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages - hdr.set_message_id(cmd.getMsgId()); + // TODO deprecate this field entirely; we already match replies with their originating command + // on the sender and set the reply's message ID from that command directly. There's therefore + // no need to redundantly carry this on the wire as well. + // Important: cannot be deprecated until there are no nodes using this value verbatim as the + // internal message ID! + hdr.set_message_id(cmd.getMsgId()); // This is _our_ process-internal message ID hdr.set_priority(cmd.getPriority()); hdr.set_source_index(cmd.getSourceIndex()); @@ -129,7 +134,8 @@ void write_response_header(vespalib::GrowableByteBuffer& buf, const api::Storage if (!result.getMessage().empty()) { hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size()); } - hdr.set_message_id(reply.getMsgId()); + // TODO deprecate this field entirely + hdr.set_message_id(reply.originator_msg_id()); // This is the _peer's_ process-internal message ID hdr.set_priority(reply.getPriority()); const auto header_size = hdr.ByteSizeLong(); @@ -227,11 +233,11 @@ public: template <typename ProtobufType> class RequestDecoder { - protobuf::RequestHeader _hdr; - ::google::protobuf::Arena _arena; - ProtobufType* _proto_obj; + protobuf::RequestHeader _hdr; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; public: - RequestDecoder(document::ByteBuffer& in_buf) + explicit RequestDecoder(document::ByteBuffer& in_buf) : _arena(), _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)) { @@ -246,7 +252,7 @@ public: } void transfer_meta_information_to(api::StorageCommand& dest) { - dest.forceMsgId(_hdr.message_id()); + dest.force_originator_msg_id(_hdr.message_id()); // TODO deprecate dest.setPriority(static_cast<uint8_t>(_hdr.priority())); dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index())); } @@ -276,7 +282,9 @@ public: } void transfer_meta_information_to(api::StorageReply& dest) { - dest.forceMsgId(_hdr.message_id()); + // TODO deprecate this; not currently used internally (message ID is explicitly set + // from the originator command instance) + dest.force_originator_msg_id(_hdr.message_id()); dest.setPriority(static_cast<uint8_t>(_hdr.priority())); dest.setResult(api::ReturnCode(static_cast<api::ReturnCode::Result>(_hdr.return_code_id()), _hdr.return_code_message())); diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp index 520f1aa2741..9c73fc7b9ee 100644 --- a/storage/src/vespa/storageapi/message/bucket.cpp +++ b/storage/src/vespa/storageapi/message/bucket.cpp @@ -240,7 +240,7 @@ GetBucketDiffCommand::print(std::ostream& out, bool verbose, } } else { out << ", " << _diff.size() << " entries"; - out << ", id " << _msgId; + out << ", id " << getMsgId(); } out << ")"; if (verbose) { @@ -278,7 +278,7 @@ GetBucketDiffReply::print(std::ostream& out, bool verbose, } } else { out << ", " << _diff.size() << " entries"; - out << ", id " << _msgId; + out << ", id " << getMsgId(); } out << ")"; if (verbose) { @@ -385,7 +385,7 @@ ApplyBucketDiffCommand::print(std::ostream& out, bool verbose, } } else { out << ", " << _diff.size() << " entries"; - out << ", id " << _msgId; + out << ", id " << getMsgId(); } out << ")"; if (verbose) { @@ -430,7 +430,7 @@ ApplyBucketDiffReply::print(std::ostream& out, bool verbose, } } else { out << ", " << _diff.size() << " entries"; - out << ", id " << _msgId; + out << ", id " << getMsgId(); } out << ")"; if (verbose) { diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp index 935b83cdd59..249e08362d4 100644 --- a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp +++ b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp @@ -11,14 +11,14 @@ namespace { } StorageCommand::StorageCommand(const StorageCommand& other) - : StorageMessage(other, generateMsgId()), + : StorageMessage(other, generateMsgId(), 0), _timeout(other._timeout), _sourceIndex(other._sourceIndex) { } StorageCommand::StorageCommand(const MessageType& type, Priority p) - : StorageMessage(type, generateMsgId()), + : StorageMessage(type, generateMsgId(), 0), _timeout(MAX_TIMEOUT), _sourceIndex(0xFFFF) { @@ -30,8 +30,8 @@ StorageCommand::~StorageCommand() = default; void StorageCommand::print(std::ostream& out, bool, const std::string&) const { - out << "StorageCommand(" << _type.getName(); - if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority); + out << "StorageCommand(" << getType().getName(); + if (getPriority() != NORMAL) out << ", priority = " << static_cast<int>(getPriority()); if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex; out << ", timeout = " << vespalib::count_ms(_timeout) << " ms"; out << ")"; diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.h b/storage/src/vespa/storageapi/messageapi/storagecommand.h index f1b736a28c8..06817a65b05 100644 --- a/storage/src/vespa/storageapi/messageapi/storagecommand.h +++ b/storage/src/vespa/storageapi/messageapi/storagecommand.h @@ -1,13 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::api::StorageCommand - * @ingroup messageapi - * - * @brief Superclass for all storage commands. + * Superclass for all storage commands. * * A storage command is a storage message you will get a storage reply for. - * - * @version $Id$ */ #pragma once @@ -24,7 +19,7 @@ class StorageCommand : public StorageMessage { uint16_t _sourceIndex; protected: - explicit StorageCommand(const StorageCommand& other); + StorageCommand(const StorageCommand& other); explicit StorageCommand(const MessageType& type, Priority p = NORMAL); public: diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp index c72ece80476..f78e8e7a8f5 100644 --- a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -236,9 +236,10 @@ StorageMessage::generateMsgId() noexcept return _G_lastMsgId.fetch_add(1, std::memory_order_relaxed); } -StorageMessage::StorageMessage(const MessageType& type, Id id) noexcept +StorageMessage::StorageMessage(const MessageType& type, Id internal_id, Id originator_id) noexcept : _type(type), - _msgId(id), + _internal_msg_id(internal_id), + _originator_msg_id(originator_id), _address(), _trace(), _approxByteSize(50), @@ -246,9 +247,10 @@ StorageMessage::StorageMessage(const MessageType& type, Id id) noexcept { } -StorageMessage::StorageMessage(const StorageMessage& other, Id id) noexcept +StorageMessage::StorageMessage(const StorageMessage& other, Id internal_id, Id originator_id) noexcept : _type(other._type), - _msgId(id), + _internal_msg_id(internal_id), + _originator_msg_id(originator_id), _address(), _trace(other.getTrace().getLevel()), _approxByteSize(other._approxByteSize), diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.h b/storage/src/vespa/storageapi/messageapi/storagemessage.h index 4649781c1e5..831e44bdba9 100644 --- a/storage/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storage/src/vespa/storageapi/messageapi/storagemessage.h @@ -1,12 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::api::StorageMessage - * @ingroup messageapi - * - * @brief Superclass for all storage messages. - * - * @version $Id$ + * Superclass for all storage messages. */ #pragma once @@ -70,10 +65,7 @@ namespace storage::api { using duration = vespalib::duration; /** - * @class MessageType - * @ingroup messageapi - * - * @brief This class defines the different message types we have. + * This class defines the different message types we have. * * This is used to be able to deserialize messages of various classes. */ @@ -348,34 +340,45 @@ public: static const char* getPriorityString(Priority); private: - static document::Bucket getDummyBucket() noexcept { return document::Bucket(document::BucketSpace::invalid(), document::BucketId()); } - mutable std::unique_ptr<TransportContext> _transportContext; - -protected: - static Id generateMsgId() noexcept; + static document::Bucket getDummyBucket() noexcept { + return {document::BucketSpace::invalid(), document::BucketId()}; + } + mutable std::unique_ptr<TransportContext> _transportContext; const MessageType& _type; - Id _msgId; + Id _internal_msg_id; + Id _originator_msg_id; StorageMessageAddress _address; vespalib::Trace _trace; uint32_t _approxByteSize; Priority _priority; - StorageMessage(const MessageType& code, Id id) noexcept; - StorageMessage(const StorageMessage&, Id id) noexcept; +protected: + static Id generateMsgId() noexcept; + + StorageMessage(const MessageType& code, Id internal_id, Id originator_id) noexcept; + StorageMessage(const StorageMessage&, Id internal_id, Id originator_id) noexcept; public: StorageMessage& operator=(const StorageMessage&) = delete; StorageMessage(const StorageMessage&) = delete; ~StorageMessage() override; - Id getMsgId() const noexcept { return _msgId; } + /** + * Process-unique internal ID. For replies, corresponds to the message ID of its command. + */ + Id getMsgId() const noexcept { return _internal_msg_id; } + /** + * If the message has an originator (i.e. sent from a peer), this is that peer's process-unique internal ID. + * It must never be used by any _other_ nodes in a context where uniqueness is assumed. + */ + [[nodiscard]] Id originator_msg_id() const noexcept { return _originator_msg_id; } /** - * Set the id of this message. Typically used to set the id to a - * unique value previously generated with the generateMsgId method. + * Set the originator ID of this message, to associate local messages with those + * generated by a remote peer. Does not affect the process-unique internal ID. **/ - void forceMsgId(Id msgId) noexcept { _msgId = msgId; } + void force_originator_msg_id(Id msgId) noexcept { _originator_msg_id = msgId; } const MessageType& getType() const noexcept { return _type; } diff --git a/storage/src/vespa/storageapi/messageapi/storagereply.cpp b/storage/src/vespa/storageapi/messageapi/storagereply.cpp index a6fefe57e08..8e09943facb 100644 --- a/storage/src/vespa/storageapi/messageapi/storagereply.cpp +++ b/storage/src/vespa/storageapi/messageapi/storagereply.cpp @@ -11,7 +11,7 @@ StorageReply::StorageReply(const StorageCommand& cmd) {} StorageReply::StorageReply(const StorageCommand& cmd, ReturnCode code) - : StorageMessage(cmd.getType().getReplyType(), cmd.getMsgId()), + : StorageMessage(cmd.getType().getReplyType(), cmd.getMsgId(), cmd.originator_msg_id()), _result(std::move(code)) { setPriority(cmd.getPriority()); @@ -32,7 +32,7 @@ StorageReply::~StorageReply() = default; void StorageReply::print(std::ostream& out, bool , const std::string& ) const { - out << "StorageReply(" << _type.getName() << ", " << _result << ")"; + out << "StorageReply(" << getType().getName() << ", " << _result << ")"; } } |