summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-04-28 16:45:05 +0200
committerGitHub <noreply@github.com>2023-04-28 16:45:05 +0200
commit77960e2d06b27ccb26b1a21596f6d82eee6706f2 (patch)
tree059cb6f1d5861ed955bcd6618115241b81f364ba
parent9dd0d84b1714e5e1544462ff3a2b02c42f98d847 (diff)
parent62778dab5c5ec29b3afb1231e3b4e94f1548142f (diff)
Merge pull request #26909 from vespa-engine/vekterli/ensure-node-internal-message-id-uniqueness
Ensure process-internal message ID uniqueness
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp69
-rw-r--r--storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp1
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp28
-rw-r--r--storage/src/vespa/storageapi/message/bucket.cpp8
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagecommand.cpp8
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagecommand.h9
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagemessage.cpp10
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagemessage.h47
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagereply.cpp4
10 files changed, 105 insertions, 81 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index 3a3a3a6b016..ec8d6855abc 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -29,6 +29,8 @@ using namespace ::testing;
using std::shared_ptr;
using document::BucketSpace;
+using document::Bucket;
+using document::BucketId;
using document::ByteBuffer;
using document::Document;
using document::DocumentId;
@@ -127,7 +129,8 @@ TEST_F(StorageProtocolTest, testAddress50) {
address.to_mbus_route().toString());
}
-template<typename Command> std::shared_ptr<Command>
+template <typename Command>
+std::shared_ptr<Command>
StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m)
{
auto mbusMessage = std::make_unique<mbusprot::StorageCommand>(m);
@@ -145,7 +148,8 @@ StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m)
return std::dynamic_pointer_cast<Command>(internalMessage);
}
-template<typename Reply> std::shared_ptr<Reply>
+template <typename Reply>
+std::shared_ptr<Reply>
StorageProtocolTest::copyReply(const std::shared_ptr<Reply>& m)
{
auto mbusMessage = std::make_unique<mbusprot::StorageReply>(m);
@@ -222,28 +226,36 @@ TEST_P(StorageProtocolTest, all_zero_bucket_info_is_propagated) {
TEST_P(StorageProtocolTest, request_metadata_is_propagated) {
auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
- cmd->forceMsgId(12345);
+ const auto sender_internal_msg_id = cmd->getMsgId();
cmd->setPriority(50);
cmd->setSourceIndex(321);
auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(12345, cmd2->getMsgId());
+ EXPECT_EQ(cmd2->originator_msg_id(), sender_internal_msg_id);
EXPECT_EQ(50, cmd2->getPriority());
EXPECT_EQ(321, cmd2->getSourceIndex());
+ // The new message should get new _internal_ message ID
+ EXPECT_NE(cmd2->getMsgId(), sender_internal_msg_id);
}
TEST_P(StorageProtocolTest, response_metadata_is_propagated) {
auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
+ const auto cmd_internal_msg_id = cmd->getMsgId();
auto cmd2 = copyCommand(cmd);
- auto reply = std::make_shared<PutReply>(*cmd2);
- reply->forceMsgId(1234);
+ auto reply = std::make_shared<PutReply>(*cmd2); // Transitively inherits originator message ID from cmd
reply->setPriority(101);
ReturnCode result(ReturnCode::TEST_AND_SET_CONDITION_FAILED, "foo is not bar");
reply->setResult(result);
auto reply2 = copyReply(reply);
EXPECT_EQ(result, reply2->getResult());
- EXPECT_EQ(1234, reply->getMsgId());
- EXPECT_EQ(101, reply->getPriority());
+ // Replies inherit the message ID from the command they are created for. In the current protocol
+ // implementation we implicitly set the reply's message ID directly from the command associated
+ // with the send-state, but older versions set it from what arrives over the wire.
+ // The originator ID is thus not actually used by us, but we set and check it here just to ensure we
+ // still propagate it back correctly over the wire (in the glorious name of backwards compatibility).
+ EXPECT_EQ(reply2->getMsgId(), cmd_internal_msg_id);
+ EXPECT_EQ(reply2->originator_msg_id(), cmd_internal_msg_id);
+ EXPECT_EQ(reply2->getPriority(), 101);
}
TEST_P(StorageProtocolTest, update) {
@@ -847,25 +859,28 @@ TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storag
}
TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) {
- EXPECT_EQ(72u, sizeof(StorageMessage));
- EXPECT_EQ(88u, sizeof(StorageReply));
- EXPECT_EQ(112u, sizeof(BucketReply));
- EXPECT_EQ(8u, sizeof(document::BucketId));
- EXPECT_EQ(16u, sizeof(document::Bucket));
- EXPECT_EQ(32u, sizeof(BucketInfo));
- EXPECT_EQ(144u, sizeof(BucketInfoReply));
- EXPECT_EQ(288u, sizeof(PutReply));
- EXPECT_EQ(272u, sizeof(UpdateReply));
- EXPECT_EQ(264u, sizeof(RemoveReply));
- EXPECT_EQ(352u, sizeof(GetReply));
- EXPECT_EQ(88u, sizeof(StorageCommand));
- EXPECT_EQ(112u, sizeof(BucketCommand));
- EXPECT_EQ(112u, sizeof(BucketInfoCommand));
- EXPECT_EQ(112u + sizeof(vespalib::string), sizeof(TestAndSetCommand));
- EXPECT_EQ(144u + sizeof(vespalib::string), sizeof(PutCommand));
- EXPECT_EQ(144u + sizeof(vespalib::string), sizeof(UpdateCommand));
- EXPECT_EQ(224u + sizeof(vespalib::string), sizeof(RemoveCommand));
- EXPECT_EQ(296u + sizeof(documentapi::TestAndSetCondition), sizeof(GetCommand));
+ constexpr size_t msg_baseline = 80u;
+ constexpr size_t reply_baseline = 96;
+
+ EXPECT_EQ(sizeof(StorageMessage), msg_baseline);
+ EXPECT_EQ(sizeof(StorageReply), reply_baseline);
+ EXPECT_EQ(sizeof(BucketReply), reply_baseline + 24);
+ EXPECT_EQ(sizeof(BucketId), 8);
+ EXPECT_EQ(sizeof(Bucket), 16);
+ EXPECT_EQ(sizeof(BucketInfo), 32);
+ EXPECT_EQ(sizeof(BucketInfoReply), reply_baseline + 56);
+ EXPECT_EQ(sizeof(PutReply), reply_baseline + 200);
+ EXPECT_EQ(sizeof(UpdateReply), reply_baseline + 184);
+ EXPECT_EQ(sizeof(RemoveReply), reply_baseline + 176);
+ EXPECT_EQ(sizeof(GetReply), reply_baseline + 264);
+ EXPECT_EQ(sizeof(StorageCommand), msg_baseline + 16);
+ EXPECT_EQ(sizeof(BucketCommand), sizeof(StorageCommand) + 24);
+ EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand));
+ EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string));
+ EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 32);
+ EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32);
+ EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112);
+ EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184);
}
} // storage::api
diff --git a/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp b/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp
index ea59fefc924..28f19c74e3c 100644
--- a/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp
+++ b/storage/src/tests/storageapi/messageapi/storage_message_address_test.cpp
@@ -33,7 +33,7 @@ TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) {
hash_of("foo", lib::NodeType::STORAGE, 1));
EXPECT_EQ(16u, sizeof(StorageMessageAddress));
- EXPECT_EQ(72u, sizeof(StorageMessage));
+ EXPECT_EQ(80u, sizeof(StorageMessage));
EXPECT_EQ(16u, sizeof(mbus::Trace));
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index e494f4e67da..34d8923c6e6 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -290,6 +290,7 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
// TODO the reply wrapper does lazy deserialization. Can we/should we ever defer?
auto reply = wrapped_reply->getInternalMessage(); // TODO message stealing
assert(reply);
+ assert(reply->getMsgId() == cmd.getMsgId());
if (!hdr.trace_payload().empty()) {
cmd.getTrace().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 0ede96179e8..f2dcf07c01a 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -88,7 +88,7 @@ std::shared_ptr<document::Document> get_document(const protobuf::Document& src_d
vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size());
return std::make_shared<document::Document>(type_repo, doc_buf);
}
- return std::shared_ptr<document::Document>();
+ return {};
}
void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) {
@@ -104,12 +104,17 @@ std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src
return document::DocumentUpdate::createHEAD(
type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
}
- return std::shared_ptr<document::DocumentUpdate>();
+ return {};
}
void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) {
protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages
- hdr.set_message_id(cmd.getMsgId());
+ // TODO deprecate this field entirely; we already match replies with their originating command
+ // on the sender and set the reply's message ID from that command directly. There's therefore
+ // no need to redundantly carry this on the wire as well.
+ // Important: cannot be deprecated until there are no nodes using this value verbatim as the
+ // internal message ID!
+ hdr.set_message_id(cmd.getMsgId()); // This is _our_ process-internal message ID
hdr.set_priority(cmd.getPriority());
hdr.set_source_index(cmd.getSourceIndex());
@@ -129,7 +134,8 @@ void write_response_header(vespalib::GrowableByteBuffer& buf, const api::Storage
if (!result.getMessage().empty()) {
hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size());
}
- hdr.set_message_id(reply.getMsgId());
+ // TODO deprecate this field entirely
+ hdr.set_message_id(reply.originator_msg_id()); // This is the _peer's_ process-internal message ID
hdr.set_priority(reply.getPriority());
const auto header_size = hdr.ByteSizeLong();
@@ -227,11 +233,11 @@ public:
template <typename ProtobufType>
class RequestDecoder {
- protobuf::RequestHeader _hdr;
- ::google::protobuf::Arena _arena;
- ProtobufType* _proto_obj;
+ protobuf::RequestHeader _hdr;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
public:
- RequestDecoder(document::ByteBuffer& in_buf)
+ explicit RequestDecoder(document::ByteBuffer& in_buf)
: _arena(),
_proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
{
@@ -246,7 +252,7 @@ public:
}
void transfer_meta_information_to(api::StorageCommand& dest) {
- dest.forceMsgId(_hdr.message_id());
+ dest.force_originator_msg_id(_hdr.message_id()); // TODO deprecate
dest.setPriority(static_cast<uint8_t>(_hdr.priority()));
dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index()));
}
@@ -276,7 +282,9 @@ public:
}
void transfer_meta_information_to(api::StorageReply& dest) {
- dest.forceMsgId(_hdr.message_id());
+ // TODO deprecate this; not currently used internally (message ID is explicitly set
+ // from the originator command instance)
+ dest.force_originator_msg_id(_hdr.message_id());
dest.setPriority(static_cast<uint8_t>(_hdr.priority()));
dest.setResult(api::ReturnCode(static_cast<api::ReturnCode::Result>(_hdr.return_code_id()),
_hdr.return_code_message()));
diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp
index 520f1aa2741..9c73fc7b9ee 100644
--- a/storage/src/vespa/storageapi/message/bucket.cpp
+++ b/storage/src/vespa/storageapi/message/bucket.cpp
@@ -240,7 +240,7 @@ GetBucketDiffCommand::print(std::ostream& out, bool verbose,
}
} else {
out << ", " << _diff.size() << " entries";
- out << ", id " << _msgId;
+ out << ", id " << getMsgId();
}
out << ")";
if (verbose) {
@@ -278,7 +278,7 @@ GetBucketDiffReply::print(std::ostream& out, bool verbose,
}
} else {
out << ", " << _diff.size() << " entries";
- out << ", id " << _msgId;
+ out << ", id " << getMsgId();
}
out << ")";
if (verbose) {
@@ -385,7 +385,7 @@ ApplyBucketDiffCommand::print(std::ostream& out, bool verbose,
}
} else {
out << ", " << _diff.size() << " entries";
- out << ", id " << _msgId;
+ out << ", id " << getMsgId();
}
out << ")";
if (verbose) {
@@ -430,7 +430,7 @@ ApplyBucketDiffReply::print(std::ostream& out, bool verbose,
}
} else {
out << ", " << _diff.size() << " entries";
- out << ", id " << _msgId;
+ out << ", id " << getMsgId();
}
out << ")";
if (verbose) {
diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp
index 935b83cdd59..249e08362d4 100644
--- a/storage/src/vespa/storageapi/messageapi/storagecommand.cpp
+++ b/storage/src/vespa/storageapi/messageapi/storagecommand.cpp
@@ -11,14 +11,14 @@ namespace {
}
StorageCommand::StorageCommand(const StorageCommand& other)
- : StorageMessage(other, generateMsgId()),
+ : StorageMessage(other, generateMsgId(), 0),
_timeout(other._timeout),
_sourceIndex(other._sourceIndex)
{
}
StorageCommand::StorageCommand(const MessageType& type, Priority p)
- : StorageMessage(type, generateMsgId()),
+ : StorageMessage(type, generateMsgId(), 0),
_timeout(MAX_TIMEOUT),
_sourceIndex(0xFFFF)
{
@@ -30,8 +30,8 @@ StorageCommand::~StorageCommand() = default;
void
StorageCommand::print(std::ostream& out, bool, const std::string&) const
{
- out << "StorageCommand(" << _type.getName();
- if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority);
+ out << "StorageCommand(" << getType().getName();
+ if (getPriority() != NORMAL) out << ", priority = " << static_cast<int>(getPriority());
if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex;
out << ", timeout = " << vespalib::count_ms(_timeout) << " ms";
out << ")";
diff --git a/storage/src/vespa/storageapi/messageapi/storagecommand.h b/storage/src/vespa/storageapi/messageapi/storagecommand.h
index f1b736a28c8..06817a65b05 100644
--- a/storage/src/vespa/storageapi/messageapi/storagecommand.h
+++ b/storage/src/vespa/storageapi/messageapi/storagecommand.h
@@ -1,13 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::api::StorageCommand
- * @ingroup messageapi
- *
- * @brief Superclass for all storage commands.
+ * Superclass for all storage commands.
*
* A storage command is a storage message you will get a storage reply for.
- *
- * @version $Id$
*/
#pragma once
@@ -24,7 +19,7 @@ class StorageCommand : public StorageMessage {
uint16_t _sourceIndex;
protected:
- explicit StorageCommand(const StorageCommand& other);
+ StorageCommand(const StorageCommand& other);
explicit StorageCommand(const MessageType& type, Priority p = NORMAL);
public:
diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp
index c72ece80476..f78e8e7a8f5 100644
--- a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -236,9 +236,10 @@ StorageMessage::generateMsgId() noexcept
return _G_lastMsgId.fetch_add(1, std::memory_order_relaxed);
}
-StorageMessage::StorageMessage(const MessageType& type, Id id) noexcept
+StorageMessage::StorageMessage(const MessageType& type, Id internal_id, Id originator_id) noexcept
: _type(type),
- _msgId(id),
+ _internal_msg_id(internal_id),
+ _originator_msg_id(originator_id),
_address(),
_trace(),
_approxByteSize(50),
@@ -246,9 +247,10 @@ StorageMessage::StorageMessage(const MessageType& type, Id id) noexcept
{
}
-StorageMessage::StorageMessage(const StorageMessage& other, Id id) noexcept
+StorageMessage::StorageMessage(const StorageMessage& other, Id internal_id, Id originator_id) noexcept
: _type(other._type),
- _msgId(id),
+ _internal_msg_id(internal_id),
+ _originator_msg_id(originator_id),
_address(),
_trace(other.getTrace().getLevel()),
_approxByteSize(other._approxByteSize),
diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.h b/storage/src/vespa/storageapi/messageapi/storagemessage.h
index 4649781c1e5..831e44bdba9 100644
--- a/storage/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storage/src/vespa/storageapi/messageapi/storagemessage.h
@@ -1,12 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::api::StorageMessage
- * @ingroup messageapi
- *
- * @brief Superclass for all storage messages.
- *
- * @version $Id$
+ * Superclass for all storage messages.
*/
#pragma once
@@ -70,10 +65,7 @@ namespace storage::api {
using duration = vespalib::duration;
/**
- * @class MessageType
- * @ingroup messageapi
- *
- * @brief This class defines the different message types we have.
+ * This class defines the different message types we have.
*
* This is used to be able to deserialize messages of various classes.
*/
@@ -348,34 +340,45 @@ public:
static const char* getPriorityString(Priority);
private:
- static document::Bucket getDummyBucket() noexcept { return document::Bucket(document::BucketSpace::invalid(), document::BucketId()); }
- mutable std::unique_ptr<TransportContext> _transportContext;
-
-protected:
- static Id generateMsgId() noexcept;
+ static document::Bucket getDummyBucket() noexcept {
+ return {document::BucketSpace::invalid(), document::BucketId()};
+ }
+ mutable std::unique_ptr<TransportContext> _transportContext;
const MessageType& _type;
- Id _msgId;
+ Id _internal_msg_id;
+ Id _originator_msg_id;
StorageMessageAddress _address;
vespalib::Trace _trace;
uint32_t _approxByteSize;
Priority _priority;
- StorageMessage(const MessageType& code, Id id) noexcept;
- StorageMessage(const StorageMessage&, Id id) noexcept;
+protected:
+ static Id generateMsgId() noexcept;
+
+ StorageMessage(const MessageType& code, Id internal_id, Id originator_id) noexcept;
+ StorageMessage(const StorageMessage&, Id internal_id, Id originator_id) noexcept;
public:
StorageMessage& operator=(const StorageMessage&) = delete;
StorageMessage(const StorageMessage&) = delete;
~StorageMessage() override;
- Id getMsgId() const noexcept { return _msgId; }
+ /**
+ * Process-unique internal ID. For replies, corresponds to the message ID of its command.
+ */
+ Id getMsgId() const noexcept { return _internal_msg_id; }
+ /**
+ * If the message has an originator (i.e. sent from a peer), this is that peer's process-unique internal ID.
+ * It must never be used by any _other_ nodes in a context where uniqueness is assumed.
+ */
+ [[nodiscard]] Id originator_msg_id() const noexcept { return _originator_msg_id; }
/**
- * Set the id of this message. Typically used to set the id to a
- * unique value previously generated with the generateMsgId method.
+ * Set the originator ID of this message, to associate local messages with those
+ * generated by a remote peer. Does not affect the process-unique internal ID.
**/
- void forceMsgId(Id msgId) noexcept { _msgId = msgId; }
+ void force_originator_msg_id(Id msgId) noexcept { _originator_msg_id = msgId; }
const MessageType& getType() const noexcept { return _type; }
diff --git a/storage/src/vespa/storageapi/messageapi/storagereply.cpp b/storage/src/vespa/storageapi/messageapi/storagereply.cpp
index a6fefe57e08..8e09943facb 100644
--- a/storage/src/vespa/storageapi/messageapi/storagereply.cpp
+++ b/storage/src/vespa/storageapi/messageapi/storagereply.cpp
@@ -11,7 +11,7 @@ StorageReply::StorageReply(const StorageCommand& cmd)
{}
StorageReply::StorageReply(const StorageCommand& cmd, ReturnCode code)
- : StorageMessage(cmd.getType().getReplyType(), cmd.getMsgId()),
+ : StorageMessage(cmd.getType().getReplyType(), cmd.getMsgId(), cmd.originator_msg_id()),
_result(std::move(code))
{
setPriority(cmd.getPriority());
@@ -32,7 +32,7 @@ StorageReply::~StorageReply() = default;
void
StorageReply::print(std::ostream& out, bool , const std::string& ) const
{
- out << "StorageReply(" << _type.getName() << ", " << _result << ")";
+ out << "StorageReply(" << getType().getName() << ", " << _result << ")";
}
}