diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-11-09 16:33:49 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-11-12 15:13:06 +0000 |
commit | 88aef2427e135c269c39b99cd3a7de7351b2608c (patch) | |
tree | 17dc338f9190f07171961079719bf63f81a42710 /storage/src/tests/distributor/mergeoperationtest.cpp | |
parent | 510580d2620b8f5e0b700aa0ee818961cf61fb60 (diff) |
Add configurable support for unordered merge forwarding
Historically the MergeThrottler component has required a deterministic
forwarding of merges between nodes in strictly increasing distribution
key order. This is to avoid distributed deadlocks caused by ending up
with two or more nodes waiting for each other to release merge resources,
where releasing one depends on releasing the other. This works well,
but has the downside that there's an inherent pressure of merges towards
nodes with lower distribution keys. These often become a bottleneck.
This commit lifts this ordering restriction, by allowing forwarded,
unordered merges to immediately enter the active merge window. By doing
this we remove the deadlock potential, since nodes will longer be waiting
on resources freed by other nodes.
Since the legacy MergeThrottler has a lot of invariant checking around
strictly increasing merge chains, we only allow unordered merges to be
scheduled towards node sets where _all_ nodes are on a Vespa version
that explicitly understands unordered merges (and thus do not self-
obliterate upon seeing one). To communicate this, full bucket fetches
will now piggy-back version-specific feature sets as part of the response
protocol. Distributors then aggregate this information internally.
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r-- | storage/src/tests/distributor/mergeoperationtest.cpp | 62 |
1 files changed, 56 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 65ee5254193..54bd06c98e0 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/dummystoragelink.h> + #include <tests/distributor/distributor_stripe_test_util.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/test/make_document_bucket.h> @@ -12,6 +12,7 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <charconv> using document::test::makeDocumentBucket; using document::test::makeBucketSpace; @@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil { } std::shared_ptr<MergeOperation> setup_minimal_merge_op(); + std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes); std::shared_ptr<MergeOperation> setup_simple_merge_op(); void assert_simple_merge_bucket_command(); void assert_simple_delete_bucket_command(); @@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation> MergeOperationTest::setup_minimal_merge_op() { document::BucketId bucket_id(16, 1); - auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2))); + auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2})); op->setIdealStateManager(&getIdealStateManager()); return op; } std::shared_ptr<MergeOperation> -MergeOperationTest::setup_simple_merge_op() +MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes) { getClock().setAbsoluteTimeInSeconds(10); @@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op() enable_cluster_state("distributor:1 storage:3"); - auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes)); op->setIdealStateManager(&getIdealStateManager()); op->start(_sender, framework::MilliSecTime(0)); return op; } +std::shared_ptr<MergeOperation> +MergeOperationTest::setup_simple_merge_op() +{ + return setup_simple_merge_op({0, 1, 2}); +} + void MergeOperationTest::assert_simple_merge_bucket_command() { @@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis num.erase(pos); trusted = true; } - bucketDB[i] = BucketCopy(0, atoi(num.c_str()), - api::BucketInfo(1, 2, 3)); + uint16_t node; + [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node); + assert(ec == std::errc{}); + bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3)); bucketDB[i].setTrusted(trusted); } std::vector<MergeMetaData> nodes(st.size()); @@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics) EXPECT_EQ(1, metrics->throttled.getValue()); } +TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) { + setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4"); + NodeSupportedFeatures with_unordered; + with_unordered.unordered_merge_chaining = true; + + set_node_supported_features(1, with_unordered); + set_node_supported_features(2, with_unordered); + + auto config = make_config(); + config->set_use_unordered_merge_chaining(true); + configure_stripe(std::move(config)); + + // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1). + setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " + "cluster state version: 0, nodes: [2, 1, 3], chain: [], " + "reasons to start: ) => 1", + _sender.getLastCommand(true)); + + // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2) + setup_simple_merge_op({1, 2}); + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, " + "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), " + "reasons to start: ) => 2", + _sender.getLastCommand(true)); + + _sender.clear(); + + config = make_config(); + config->set_use_unordered_merge_chaining(false); + configure_stripe(std::move(config)); + + // If config is not enabled, should send ordered even if nodes support the feature. + setup_simple_merge_op({2, 1}); + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, " + "cluster state version: 0, nodes: [2, 1], chain: [], " + "reasons to start: ) => 1", + _sender.getLastCommand(true)); +} + } // storage::distributor |