diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-11-09 16:33:49 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-11-12 15:13:06 +0000 |
commit | 88aef2427e135c269c39b99cd3a7de7351b2608c (patch) | |
tree | 17dc338f9190f07171961079719bf63f81a42710 /storageapi | |
parent | 510580d2620b8f5e0b700aa0ee818961cf61fb60 (diff) |
Add configurable support for unordered merge forwarding
Historically the MergeThrottler component has required a deterministic
forwarding of merges between nodes in strictly increasing distribution
key order. This is to avoid distributed deadlocks caused by ending up
with two or more nodes waiting for each other to release merge resources,
where releasing one depends on releasing the other. This works well,
but has the downside that there's an inherent pressure of merges towards
nodes with lower distribution keys. These often become a bottleneck.
This commit lifts this ordering restriction, by allowing forwarded,
unordered merges to immediately enter the active merge window. By doing
this we remove the deadlock potential, since nodes will longer be waiting
on resources freed by other nodes.
Since the legacy MergeThrottler has a lot of invariant checking around
strictly increasing merge chains, we only allow unordered merges to be
scheduled towards node sets where _all_ nodes are on a Vespa version
that explicitly understands unordered merges (and thus do not self-
obliterate upon seeing one). To communicate this, full bucket fetches
will now piggy-back version-specific feature sets as part of the response
protocol. Distributors then aggregate this information internally.
Diffstat (limited to 'storageapi')
5 files changed, 56 insertions, 6 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index a6021a7cfd2..6a00ddc8d8e 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -410,6 +410,12 @@ TEST_P(StorageProtocolTest, request_bucket_info) { // "Last modified" not counted by operator== for some reason. Testing // separately until we can figure out if this is by design or not. EXPECT_EQ(lastMod, entries[0]._info.getLastModified()); + + if (GetParam().getMajor() >= 7) { + EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining); + } else { + EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining); + } } } @@ -471,12 +477,18 @@ TEST_P(StorageProtocolTest, merge_bucket) { chain.push_back(14); auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain); + cmd->set_use_unordered_forwarding(true); auto cmd2 = copyCommand(cmd); EXPECT_EQ(_bucket, cmd2->getBucket()); EXPECT_EQ(nodes, cmd2->getNodes()); EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp()); EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); EXPECT_EQ(chain, cmd2->getChain()); + if (GetParam().getMajor() >= 7) { + EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding()); + } else { + EXPECT_FALSE(cmd2->use_unordered_forwarding()); + } auto reply = std::make_shared<MergeBucketReply>(*cmd); auto reply2 = copyReply(reply); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 34d67fdc00c..7f7ab1d7c0b 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -38,6 +38,7 @@ message MergeBucketRequest { uint64 max_timestamp = 3; repeated MergeNode nodes = 4; repeated uint32 node_chain = 5; + bool unordered_forwarding = 6; } message MergeBucketResponse { @@ -108,8 +109,14 @@ message BucketAndBucketInfo { BucketInfo bucket_info = 2; } +message SupportedNodeFeatures { + bool unordered_merge_chaining = 1; +} + message RequestBucketInfoResponse { repeated BucketAndBucketInfo bucket_infos = 1; + // Only present for full bucket info fetches (not for explicit buckets) + SupportedNodeFeatures supported_node_features = 2; } message NotifyBucketChangeRequest { diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index bb4cb6e24a3..8425294cbbd 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -766,6 +766,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_timestamp(msg.getMaxTimestamp()); req.set_cluster_state_version(msg.getClusterStateVersion()); + req.set_unordered_forwarding(msg.use_unordered_forwarding()); for (uint16_t chain_node : msg.getChain()) { req.add_node_chain(chain_node); } @@ -787,6 +788,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& chain.emplace_back(node); } cmd->setChain(std::move(chain)); + cmd->set_use_unordered_forwarding(req.unordered_forwarding()); return cmd; }); } @@ -999,6 +1001,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId()); set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info); } + // We mark features as available at protocol level. Only included for full bucket fetch responses. + if (msg.full_bucket_fetch()) { + res.mutable_supported_node_features()->set_unordered_merge_chaining(true); + } }); } @@ -1035,6 +1041,11 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); } + if (res.has_supported_node_features()) { + const auto& src_features = res.supported_node_features(); + auto& dest_features = reply->supported_node_features(); + dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining(); + } return reply; }); } diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp index 360db5ea3d7..04a40fbc885 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.cpp +++ b/storageapi/src/vespa/storageapi/message/bucket.cpp @@ -107,7 +107,8 @@ MergeBucketCommand::MergeBucketCommand( _nodes(nodes), _maxTimestamp(maxTimestamp), _clusterStateVersion(clusterStateVersion), - _chain(chain) + _chain(chain), + _use_unordered_forwarding(false) {} MergeBucketCommand::~MergeBucketCommand() = default; @@ -128,6 +129,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in out << _chain[i]; } out << "]"; + if (_use_unordered_forwarding) { + out << " (unordered forwarding)"; + } out << ", reasons to start: " << _reason; out << ")"; if (verbose) { diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h index c24ed55d7a8..d62888e0527 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.h +++ b/storageapi/src/vespa/storageapi/message/bucket.h @@ -118,6 +118,7 @@ private: Timestamp _maxTimestamp; uint32_t _clusterStateVersion; std::vector<uint16_t> _chain; + bool _use_unordered_forwarding; public: MergeBucketCommand(const document::Bucket &bucket, @@ -133,6 +134,10 @@ public: uint32_t getClusterStateVersion() const { return _clusterStateVersion; } void setClusterStateVersion(uint32_t version) { _clusterStateVersion = version; } void setChain(const std::vector<uint16_t>& chain) { _chain = chain; } + void set_use_unordered_forwarding(bool unordered_forwarding) noexcept { + _use_unordered_forwarding = unordered_forwarding; + } + [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket) }; @@ -385,19 +390,30 @@ public: : _bucketId(id), _info(info) {} friend std::ostream& operator<<(std::ostream& os, const Entry&); }; - typedef vespalib::Array<Entry> EntryVector; + struct SupportedNodeFeatures { + bool unordered_merge_chaining = false; + }; + using EntryVector = vespalib::Array<Entry>; private: - EntryVector _buckets; - bool _full_bucket_fetch; - document::BucketId _super_bucket_id; + EntryVector _buckets; + bool _full_bucket_fetch; + document::BucketId _super_bucket_id; + SupportedNodeFeatures _supported_node_features; public: explicit RequestBucketInfoReply(const RequestBucketInfoCommand& cmd); - ~RequestBucketInfoReply(); + ~RequestBucketInfoReply() override; const EntryVector & getBucketInfo() const { return _buckets; } EntryVector & getBucketInfo() { return _buckets; } [[nodiscard]] bool full_bucket_fetch() const noexcept { return _full_bucket_fetch; } + // Only contains useful information if full_bucket_fetch() == true + [[nodiscard]] const SupportedNodeFeatures& supported_node_features() const noexcept { + return _supported_node_features; + } + [[nodiscard]] SupportedNodeFeatures& supported_node_features() noexcept { + return _supported_node_features; + } const document::BucketId& super_bucket_id() const { return _super_bucket_id; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(RequestBucketInfoReply, onRequestBucketInfoReply) |