aboutsummaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-11-09 16:33:49 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-11-12 15:13:06 +0000
commit88aef2427e135c269c39b99cd3a7de7351b2608c (patch)
tree17dc338f9190f07171961079719bf63f81a42710 /storageapi
parent510580d2620b8f5e0b700aa0ee818961cf61fb60 (diff)
Add configurable support for unordered merge forwarding
Historically the MergeThrottler component has required a deterministic forwarding of merges between nodes in strictly increasing distribution key order. This is to avoid distributed deadlocks caused by ending up with two or more nodes waiting for each other to release merge resources, where releasing one depends on releasing the other. This works well, but has the downside that there's an inherent pressure of merges towards nodes with lower distribution keys. These often become a bottleneck. This commit lifts this ordering restriction, by allowing forwarded, unordered merges to immediately enter the active merge window. By doing this we remove the deadlock potential, since nodes will longer be waiting on resources freed by other nodes. Since the legacy MergeThrottler has a lot of invariant checking around strictly increasing merge chains, we only allow unordered merges to be scheduled towards node sets where _all_ nodes are on a Vespa version that explicitly understands unordered merges (and thus do not self- obliterate upon seeing one). To communicate this, full bucket fetches will now piggy-back version-specific feature sets as part of the response protocol. Distributors then aggregate this information internally.
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp12
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto7
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp11
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.cpp6
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.h26
5 files changed, 56 insertions, 6 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index a6021a7cfd2..6a00ddc8d8e 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -410,6 +410,12 @@ TEST_P(StorageProtocolTest, request_bucket_info) {
// "Last modified" not counted by operator== for some reason. Testing
// separately until we can figure out if this is by design or not.
EXPECT_EQ(lastMod, entries[0]._info.getLastModified());
+
+ if (GetParam().getMajor() >= 7) {
+ EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining);
+ } else {
+ EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining);
+ }
}
}
@@ -471,12 +477,18 @@ TEST_P(StorageProtocolTest, merge_bucket) {
chain.push_back(14);
auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
+ cmd->set_use_unordered_forwarding(true);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ(_bucket, cmd2->getBucket());
EXPECT_EQ(nodes, cmd2->getNodes());
EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp());
EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
EXPECT_EQ(chain, cmd2->getChain());
+ if (GetParam().getMajor() >= 7) {
+ EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
+ } else {
+ EXPECT_FALSE(cmd2->use_unordered_forwarding());
+ }
auto reply = std::make_shared<MergeBucketReply>(*cmd);
auto reply2 = copyReply(reply);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 34d67fdc00c..7f7ab1d7c0b 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -38,6 +38,7 @@ message MergeBucketRequest {
uint64 max_timestamp = 3;
repeated MergeNode nodes = 4;
repeated uint32 node_chain = 5;
+ bool unordered_forwarding = 6;
}
message MergeBucketResponse {
@@ -108,8 +109,14 @@ message BucketAndBucketInfo {
BucketInfo bucket_info = 2;
}
+message SupportedNodeFeatures {
+ bool unordered_merge_chaining = 1;
+}
+
message RequestBucketInfoResponse {
repeated BucketAndBucketInfo bucket_infos = 1;
+ // Only present for full bucket info fetches (not for explicit buckets)
+ SupportedNodeFeatures supported_node_features = 2;
}
message NotifyBucketChangeRequest {
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index bb4cb6e24a3..8425294cbbd 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -766,6 +766,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand&
set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
req.set_max_timestamp(msg.getMaxTimestamp());
req.set_cluster_state_version(msg.getClusterStateVersion());
+ req.set_unordered_forwarding(msg.use_unordered_forwarding());
for (uint16_t chain_node : msg.getChain()) {
req.add_node_chain(chain_node);
}
@@ -787,6 +788,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf&
chain.emplace_back(node);
}
cmd->setChain(std::move(chain));
+ cmd->set_use_unordered_forwarding(req.unordered_forwarding());
return cmd;
});
}
@@ -999,6 +1001,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe
bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId());
set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info);
}
+ // We mark features as available at protocol level. Only included for full bucket fetch responses.
+ if (msg.full_bucket_fetch()) {
+ res.mutable_supported_node_features()->set_unordered_merge_chaining(true);
+ }
});
}
@@ -1035,6 +1041,11 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id());
dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info());
}
+ if (res.has_supported_node_features()) {
+ const auto& src_features = res.supported_node_features();
+ auto& dest_features = reply->supported_node_features();
+ dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining();
+ }
return reply;
});
}
diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp
index 360db5ea3d7..04a40fbc885 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.cpp
+++ b/storageapi/src/vespa/storageapi/message/bucket.cpp
@@ -107,7 +107,8 @@ MergeBucketCommand::MergeBucketCommand(
_nodes(nodes),
_maxTimestamp(maxTimestamp),
_clusterStateVersion(clusterStateVersion),
- _chain(chain)
+ _chain(chain),
+ _use_unordered_forwarding(false)
{}
MergeBucketCommand::~MergeBucketCommand() = default;
@@ -128,6 +129,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in
out << _chain[i];
}
out << "]";
+ if (_use_unordered_forwarding) {
+ out << " (unordered forwarding)";
+ }
out << ", reasons to start: " << _reason;
out << ")";
if (verbose) {
diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h
index c24ed55d7a8..d62888e0527 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.h
+++ b/storageapi/src/vespa/storageapi/message/bucket.h
@@ -118,6 +118,7 @@ private:
Timestamp _maxTimestamp;
uint32_t _clusterStateVersion;
std::vector<uint16_t> _chain;
+ bool _use_unordered_forwarding;
public:
MergeBucketCommand(const document::Bucket &bucket,
@@ -133,6 +134,10 @@ public:
uint32_t getClusterStateVersion() const { return _clusterStateVersion; }
void setClusterStateVersion(uint32_t version) { _clusterStateVersion = version; }
void setChain(const std::vector<uint16_t>& chain) { _chain = chain; }
+ void set_use_unordered_forwarding(bool unordered_forwarding) noexcept {
+ _use_unordered_forwarding = unordered_forwarding;
+ }
+ [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket)
};
@@ -385,19 +390,30 @@ public:
: _bucketId(id), _info(info) {}
friend std::ostream& operator<<(std::ostream& os, const Entry&);
};
- typedef vespalib::Array<Entry> EntryVector;
+ struct SupportedNodeFeatures {
+ bool unordered_merge_chaining = false;
+ };
+ using EntryVector = vespalib::Array<Entry>;
private:
- EntryVector _buckets;
- bool _full_bucket_fetch;
- document::BucketId _super_bucket_id;
+ EntryVector _buckets;
+ bool _full_bucket_fetch;
+ document::BucketId _super_bucket_id;
+ SupportedNodeFeatures _supported_node_features;
public:
explicit RequestBucketInfoReply(const RequestBucketInfoCommand& cmd);
- ~RequestBucketInfoReply();
+ ~RequestBucketInfoReply() override;
const EntryVector & getBucketInfo() const { return _buckets; }
EntryVector & getBucketInfo() { return _buckets; }
[[nodiscard]] bool full_bucket_fetch() const noexcept { return _full_bucket_fetch; }
+ // Only contains useful information if full_bucket_fetch() == true
+ [[nodiscard]] const SupportedNodeFeatures& supported_node_features() const noexcept {
+ return _supported_node_features;
+ }
+ [[nodiscard]] SupportedNodeFeatures& supported_node_features() noexcept {
+ return _supported_node_features;
+ }
const document::BucketId& super_bucket_id() const { return _super_bucket_id; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(RequestBucketInfoReply, onRequestBucketInfoReply)