summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-11-09 16:33:49 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-11-12 15:13:06 +0000
commit88aef2427e135c269c39b99cd3a7de7351b2608c (patch)
tree17dc338f9190f07171961079719bf63f81a42710 /storage
parent510580d2620b8f5e0b700aa0ee818961cf61fb60 (diff)
Add configurable support for unordered merge forwarding
Historically the MergeThrottler component has required a deterministic forwarding of merges between nodes in strictly increasing distribution key order. This is to avoid distributed deadlocks caused by ending up with two or more nodes waiting for each other to release merge resources, where releasing one depends on releasing the other. This works well, but has the downside that there's an inherent pressure of merges towards nodes with lower distribution keys. These often become a bottleneck. This commit lifts this ordering restriction, by allowing forwarded, unordered merges to immediately enter the active merge window. By doing this we remove the deadlock potential, since nodes will longer be waiting on resources freed by other nodes. Since the legacy MergeThrottler has a lot of invariant checking around strictly increasing merge chains, we only allow unordered merges to be scheduled towards node sets where _all_ nodes are on a Vespa version that explicitly understands unordered merges (and thus do not self- obliterate upon seeing one). To communicate this, full bucket fetches will now piggy-back version-specific feature sets as part of the response protocol. Distributors then aggregate this information internally.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp3
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp17
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp9
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp62
-rw-r--r--storage/src/tests/distributor/mock_tickable_stripe.h4
-rw-r--r--storage/src/tests/distributor/node_supported_features_repo_test.cpp52
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp53
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp6
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp58
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h7
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def6
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.h10
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_interface.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h2
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features.h19
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features_repo.cpp37
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features_repo.h37
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h13
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h4
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp85
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h36
37 files changed, 550 insertions, 99 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 7348cfc328b..bee7650aebd 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
mergelimitertest.cpp
mergeoperationtest.cpp
multi_thread_stripe_access_guard_test.cpp
+ node_supported_features_repo_test.cpp
nodeinfotest.cpp
nodemaintenancestatstrackertest.cpp
operation_sequencer_test.cpp
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 15aada37c9b..7c97c962a97 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -100,6 +100,9 @@ struct FakeDistributorStripeOperationContext : public DistributorStripeOperation
const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override {
abort();
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override {
+ abort();
+ }
};
struct BlockingOperationStarterTest : Test {
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 902dd6454f1..8c2ebc983fa 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -185,6 +185,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}
+ void configure_use_unordered_merge_chaining(bool use_unordered) {
+ ConfigBuilder builder;
+ builder.useUnorderedMergeChaining = use_unordered;
+ configure_stripe(builder);
+ }
+
bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
@@ -982,4 +988,15 @@ TEST_F(DistributorStripeTest, closing_aborts_gets_started_outside_stripe_thread)
EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult());
}
+TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_to_internal_config)
+{
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_use_unordered_merge_chaining(true);
+ EXPECT_TRUE(getConfig().use_unordered_merge_chaining());
+
+ configure_use_unordered_merge_chaining(false);
+ EXPECT_FALSE(getConfig().use_unordered_merge_chaining());
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index c5c51e64e68..b96b2dda1cb 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -9,8 +9,10 @@
#include <vespa/storage/distributor/distributor_stripe_component.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/ideal_state_total_metrics.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
using document::test::makeBucketSpace;
using document::test::makeDocumentBucket;
@@ -526,6 +528,13 @@ DistributorStripeTestUtil::db_memory_sample_interval() const noexcept {
return _stripe->db_memory_sample_interval();
}
+void
+DistributorStripeTestUtil::set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features) {
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> new_features;
+ new_features[node] = features;
+ _stripe->update_node_supported_features_repo(_stripe->node_supported_features_repo().make_union_of(new_features));
+}
+
const lib::Distribution&
DistributorStripeTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index b1e90821e3b..3226c16aba3 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -26,6 +26,7 @@ class DocumentSelectionParser;
class ExternalOperationHandler;
class IdealStateManager;
class IdealStateMetricSet;
+class NodeSupportedFeatures;
class Operation;
class StripeBucketDBUpdater;
@@ -150,6 +151,7 @@ public:
[[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
[[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
[[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
+ void set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features);
const lib::Distribution& getDistribution() const;
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 65ee5254193..54bd06c98e0 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
+
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -12,6 +12,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <charconv>
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
@@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
}
std::shared_ptr<MergeOperation> setup_minimal_merge_op();
+ std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes);
std::shared_ptr<MergeOperation> setup_simple_merge_op();
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
@@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation>
MergeOperationTest::setup_minimal_merge_op()
{
document::BucketId bucket_id(16, 1);
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2}));
op->setIdealStateManager(&getIdealStateManager());
return op;
}
std::shared_ptr<MergeOperation>
-MergeOperationTest::setup_simple_merge_op()
+MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes)
{
getClock().setAbsoluteTimeInSeconds(10);
@@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op()
enable_cluster_state("distributor:1 storage:3");
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->start(_sender, framework::MilliSecTime(0));
return op;
}
+std::shared_ptr<MergeOperation>
+MergeOperationTest::setup_simple_merge_op()
+{
+ return setup_simple_merge_op({0, 1, 2});
+}
+
void
MergeOperationTest::assert_simple_merge_bucket_command()
{
@@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis
num.erase(pos);
trusted = true;
}
- bucketDB[i] = BucketCopy(0, atoi(num.c_str()),
- api::BucketInfo(1, 2, 3));
+ uint16_t node;
+ [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node);
+ assert(ec == std::errc{});
+ bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3));
bucketDB[i].setTrusted(trusted);
}
std::vector<MergeMetaData> nodes(st.size());
@@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics)
EXPECT_EQ(1, metrics->throttled.getValue());
}
+TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) {
+ setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4");
+ NodeSupportedFeatures with_unordered;
+ with_unordered.unordered_merge_chaining = true;
+
+ set_node_supported_features(1, with_unordered);
+ set_node_supported_features(2, with_unordered);
+
+ auto config = make_config();
+ config->set_use_unordered_merge_chaining(true);
+ configure_stripe(std::move(config));
+
+ // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1).
+ setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [2, 1, 3], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+
+ // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
+ setup_simple_merge_op({1, 2});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
+ "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
+ "reasons to start: ) => 2",
+ _sender.getLastCommand(true));
+
+ _sender.clear();
+
+ config = make_config();
+ config->set_use_unordered_merge_chaining(false);
+ configure_stripe(std::move(config));
+
+ // If config is not enabled, should send ordered even if nodes support the feature.
+ setup_simple_merge_op({2, 1});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
+ "cluster state version: 0, nodes: [2, 1], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h
index 38fc0c599a2..ec2f978c029 100644
--- a/storage/src/tests/distributor/mock_tickable_stripe.h
+++ b/storage/src/tests/distributor/mock_tickable_stripe.h
@@ -33,6 +33,10 @@ struct MockTickableStripe : TickableStripe {
void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
void clear_read_only_bucket_repo_databases() override { abort(); }
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo>) override {
+ abort();
+ }
+
void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
diff --git a/storage/src/tests/distributor/node_supported_features_repo_test.cpp b/storage/src/tests/distributor/node_supported_features_repo_test.cpp
new file mode 100644
index 00000000000..990e0fc50a3
--- /dev/null
+++ b/storage/src/tests/distributor/node_supported_features_repo_test.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/distributor/node_supported_features_repo.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct NodeSupportedFeaturesRepoTest : Test {
+ using FeatureMap = vespalib::hash_map<uint16_t, NodeSupportedFeatures>;
+ NodeSupportedFeaturesRepo _repo;
+
+ static NodeSupportedFeatures set_features() noexcept {
+ NodeSupportedFeatures f;
+ f.unordered_merge_chaining = true;
+ return f;
+ }
+
+ static NodeSupportedFeatures unset_features() noexcept {
+ return {};
+ }
+};
+
+TEST_F(NodeSupportedFeaturesRepoTest, feature_set_is_empty_by_default) {
+ EXPECT_EQ(_repo.node_supported_features(0), unset_features());
+ EXPECT_EQ(_repo.node_supported_features(12345), unset_features());
+}
+
+TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_can_add_new_feature_mapping) {
+ FeatureMap fm;
+ fm[1] = set_features();
+ fm[60] = set_features();
+ auto new_repo = _repo.make_union_of(fm);
+ EXPECT_EQ(new_repo->node_supported_features(0), unset_features());
+ EXPECT_EQ(new_repo->node_supported_features(1), set_features());
+ EXPECT_EQ(new_repo->node_supported_features(60), set_features());
+}
+
+TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_updates_existing_feature_mappings) {
+ FeatureMap fm;
+ fm[1] = set_features();
+ fm[60] = set_features();
+ auto new_repo = _repo.make_union_of(fm);
+ fm[1] = unset_features();
+ new_repo = new_repo->make_union_of(fm);
+ EXPECT_EQ(new_repo->node_supported_features(1), unset_features());
+ EXPECT_EQ(new_repo->node_supported_features(60), set_features());
+}
+
+}
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index fe8a607c9ae..3ed5e9f4a8d 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/top_level_bucket_db_updater.h>
#include <vespa/storage/distributor/bucket_space_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
@@ -119,6 +120,21 @@ public:
invalid_bucket_count));
}
+ void fake_bucket_reply(const lib::ClusterState &state,
+ const api::StorageCommand &cmd,
+ uint32_t bucket_count,
+ const std::function<void(api::RequestBucketInfoReply&)>& reply_decorator)
+ {
+ ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO);
+ const api::StorageMessageAddress& address(*cmd.getAddress());
+ auto reply = make_fake_bucket_reply(state,
+ dynamic_cast<const RequestBucketInfoCommand &>(cmd),
+ address.getIndex(),
+ bucket_count, 0);
+ reply_decorator(*reply);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+ }
+
void send_fake_reply_for_single_bucket_request(
const api::RequestBucketInfoCommand& rbi)
{
@@ -232,7 +248,7 @@ public:
}
}
- api::StorageMessageAddress storage_address(uint16_t node) {
+ static api::StorageMessageAddress storage_address(uint16_t node) {
static vespalib::string _storage("storage");
return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node);
}
@@ -1299,7 +1315,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
for (uint32_t i = 0; i < 3; ++i) {
- nodes.push_back(api::MergeBucketCommand::Node(i));
+ nodes.emplace_back(i);
}
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
@@ -2662,4 +2678,37 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}
+TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
+ lib::ClusterState state("distributor:1 storage:3");
+ set_cluster_state(state);
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+
+ // Known feature sets are initially empty.
+ auto stripes = distributor_stripes();
+ for (auto* s : stripes) {
+ for (uint16_t i : {0, 1, 2}) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ }
+ }
+
+ ASSERT_EQ(expected_msgs, _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); i++) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
+ dummy_buckets_to_return, [i](auto& reply) noexcept {
+ // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Node 0 does not support the fanciness.
+ if (i > 0) {
+ reply.supported_node_features().unordered_merge_chaining = true;
+ }
+ }));
+ }
+
+ // Node features should be propagated to all stripes
+ for (auto* s : stripes) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ }
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 636a09d1f6e..2a61141865a 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -115,13 +115,13 @@ TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api:
void
TopLevelDistributorTestUtil::close()
{
- _component.reset(0);
- if (_distributor.get()) {
+ _component.reset();
+ if (_distributor) {
_stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose
_distributor->onClose();
}
_sender.clear();
- _node.reset(0);
+ _node.reset();
_config = getStandardConfig(false);
}
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index e8f8e425af4..0f844ab6b4f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -52,15 +52,18 @@ struct MergeBuilder {
~MergeBuilder();
MergeBuilder& nodes(uint16_t n0) {
+ _nodes.clear();
_nodes.push_back(n0);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
_nodes.push_back(n2);
@@ -146,7 +149,8 @@ struct MergeThrottlerTest : Test {
api::ReturnCode::Result expectedResultCode);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+ void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
+ void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
auto& queue = _throttlers[throttler_idx]->getMergeQueue();
@@ -1197,7 +1201,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
}
void
-MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits)
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
@@ -1218,10 +1222,15 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
// Send down another merge with non-empty chain. It should _not_ be busy bounced
// (if limits disabled) as it has already been accepted into another node's merge window.
{
- std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
+ std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
- cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ if (!unordered_fwd) {
+ cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ } else {
+ cmd->setChain(std::vector<uint16_t>({2})); // Forwarded from node 2, i.e. _not_ the lowest index
+ }
+ cmd->set_use_unordered_forwarding(unordered_fwd);
_topLinks[1]->sendDown(cmd);
}
}
@@ -1249,11 +1258,34 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa
EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
}
+TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) {
+ // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too.
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true));
+
+ // Unordered merge is immediately forwarded to the next node
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ auto fwd = std::dynamic_pointer_cast<api::MergeBucketCommand>(
+ _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET));
+ ASSERT_TRUE(fwd);
+ EXPECT_TRUE(fwd->use_unordered_forwarding());
+ EXPECT_EQ(fwd->getChain(), std::vector<uint16_t>({2, 1}));
+}
+
+TEST_F(MergeThrottlerTest, non_forwarded_unordered_merge_is_enqueued_if_active_window_full)
+{
+ fill_throttler_queue_with_n_commands(1, 0); // Fill active window entirely
+ {
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {0}});
+ auto cmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
+ cmd->set_use_unordered_forwarding(true);
+ _topLinks[1]->sendDown(cmd);
+ }
+ waitUntilMergeQueueIs(*_throttlers[1], 1, _messageWaitTime); // Should be in queue, not active window
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(0);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({1, 0, 2});
{
std::vector<uint16_t> chain;
chain.push_back(0);
@@ -1268,10 +1300,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
// Send cycled merge which will be executed
{
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(1);
- chain.push_back(2);
+ std::vector<uint16_t> chain({0, 1, 2});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 1, chain);
_topLinks[1]->sendDown(cmd);
@@ -1425,9 +1454,10 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
for (size_t i = 0; i < max_pending + queued_count; ++i) {
- _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create());
+ _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
+ .nodes(throttler_index, throttler_index + 1)
+ .create());
}
-
// Wait till we have max_pending merge forwards and queued_count enqueued.
_topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime);
waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index a23d00ee6a3..8a40899165f 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -50,6 +50,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_prioritize_global_bucket_merges(true),
_enable_revert(true),
_implicitly_clear_priority_on_schedule(false),
+ _use_unordered_merge_chaining(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -171,6 +172,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups;
_enable_revert = config.enableRevert;
_implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule;
+ _use_unordered_merge_chaining = config.useUnorderedMergeChaining;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 7b4e082d1ed..ea1aca17116 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -267,6 +267,12 @@ public:
[[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept {
return _implicitly_clear_priority_on_schedule;
}
+ void set_use_unordered_merge_chaining(bool unordered) noexcept {
+ _use_unordered_merge_chaining = unordered;
+ }
+ [[nodiscard]] bool use_unordered_merge_chaining() const noexcept {
+ return _use_unordered_merge_chaining;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -324,6 +330,7 @@ private:
bool _prioritize_global_bucket_merges;
bool _enable_revert;
bool _implicitly_clear_priority_on_schedule;
+ bool _use_unordered_merge_chaining;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 8a9fdf74802..990a0530ecd 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -286,3 +286,9 @@ num_distributor_stripes int default=0 restart
## bucket due to being blocked by concurrent operations. This avoids potential head-of-line
## blocking of later buckets in the priority database.
implicitly_clear_bucket_priority_on_schedule bool default=false
+
+## Enables sending merges that are not forwarded between content nodes in strictly
+## increasing node key order. Even if this config is set to true, unordered merges
+## will only be sent if _all_ nodes involved in a given merge have previously
+## reported (as part of bucket info fetching) that they support the unordered merge feature.
+use_unordered_merge_chaining bool default=false
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 52171406ebf..470bfb69abb 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -32,6 +32,7 @@ vespa_add_library(storage_distributor
messagetracker.cpp
min_replica_provider.cpp
multi_threaded_stripe_access_guard.cpp
+ node_supported_features_repo.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
operation_sequencer.cpp
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 934c5e364d8..bceb4ed1377 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -17,7 +17,7 @@ class DistributorBucketSpaceRepo;
*/
class DistributorOperationContext {
public:
- virtual ~DistributorOperationContext() {}
+ virtual ~DistributorOperationContext() = default;
virtual api::Timestamp generate_unique_timestamp() = 0;
virtual const BucketSpaceStateMap& bucket_space_states() const noexcept = 0;
virtual BucketSpaceStateMap& bucket_space_states() noexcept = 0;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 9f565686216..50c70306d92 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -6,6 +6,7 @@
#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "node_supported_features_repo.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "storage_node_up_states.h"
@@ -68,6 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_recoveryTimeStarted(_component.getClock()),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()),
+ _node_supported_features_repo(std::make_shared<const NodeSupportedFeaturesRepo>()),
_metricLock(),
_maintenanceStats(),
_bucketSpacesStats(),
@@ -872,6 +874,12 @@ DistributorStripe::clear_read_only_bucket_repo_databases()
}
void
+DistributorStripe::update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo)
+{
+ _node_supported_features_repo = std::move(features_repo);
+}
+
+void
DistributorStripe::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const
{
ideal_state_manager().dump_bucket_space_db_status(bucket_space, out);
@@ -889,4 +897,10 @@ DistributorStripe::report_delayed_single_bucket_requests(vespalib::xml::XmlOutpu
bucket_db_updater().report_delayed_single_bucket_requests(xos);
}
+const NodeSupportedFeaturesRepo&
+DistributorStripe::node_supported_features_repo() const noexcept
+{
+ return *_node_supported_features_repo;
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 5ba682d46e3..ce6a2071efd 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -160,6 +160,8 @@ public:
return *_bucketIdHasher;
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override;
+
StripeBucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
const StripeBucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
IdealStateManager& ideal_state_manager() { return _idealStateManager; }
@@ -283,6 +285,7 @@ private:
void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
void clear_read_only_bucket_repo_databases() override;
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) override;
void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override;
void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
@@ -338,6 +341,7 @@ private:
framework::ThreadWaitInfo _tickResult;
BucketDBMetricUpdater _bucketDBMetricUpdater;
std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher;
+ std::shared_ptr<const NodeSupportedFeaturesRepo> _node_supported_features_repo;
mutable std::mutex _metricLock;
/**
* Maintenance stats for last completed database scan iteration.
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
index f2d2afb8fee..aa0a2289727 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
@@ -277,6 +277,12 @@ DistributorStripeComponent::storage_node_is_up(document::BucketSpace bucket_spac
return ns.getState().oneOf(storage_node_up_states());
}
+const NodeSupportedFeaturesRepo&
+DistributorStripeComponent::node_supported_features_repo() const noexcept
+{
+ return _distributor.node_supported_features_repo();
+}
+
std::unique_ptr<document::select::Node>
DistributorStripeComponent::parse_selection(const vespalib::string& selection) const
{
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
index b274e21ac7c..5bcf9eec76d 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
@@ -70,7 +70,7 @@ public:
*/
void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
- uint32_t update_flags = 0) override {
+ uint32_t update_flags) override {
update_bucket_database(bucket,
toVector<BucketCopy>(changed_node),
update_flags);
@@ -79,9 +79,9 @@ public:
/**
* Adds the given copies to the bucket database.
*/
- virtual void update_bucket_database(const document::Bucket& bucket,
- const std::vector<BucketCopy>& changed_nodes,
- uint32_t update_flags = 0) override;
+ void update_bucket_database(const document::Bucket& bucket,
+ const std::vector<BucketCopy>& changed_nodes,
+ uint32_t update_flags) override;
/**
* Removes a copy from the given bucket from the bucket database.
@@ -165,6 +165,8 @@ public:
return getDistributor().getBucketIdHasher();
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override;
+
// Implements DocumentSelectionParser
std::unique_ptr<document::select::Node> parse_selection(const vespalib::string& selection) const override;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
index 4f39dd7e5bc..dfed59499c6 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
@@ -16,6 +16,7 @@ namespace storage {
namespace storage::distributor {
class DistributorMetricSet;
+class NodeSupportedFeaturesRepo;
class PendingMessageTracker;
/**
@@ -61,6 +62,7 @@ public:
virtual const DistributorConfiguration& getConfig() const = 0;
virtual ChainedMessageSender& getMessageSender() = 0;
virtual const BucketGcTimeCalculator::BucketIdHasher& getBucketIdHasher() const = 0;
+ virtual const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
index 5919261ab43..d6f4e5694f6 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
@@ -16,6 +16,7 @@ namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
class PendingMessageTracker;
+class NodeSupportedFeaturesRepo;
/**
* Interface with functionality that is used when handling distributor stripe operations.
@@ -57,6 +58,7 @@ public:
virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0;
virtual const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const = 0;
+ virtual const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index 1a44b79ac3a..b00e4ce3cba 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -132,6 +132,14 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
});
}
+void MultiThreadedStripeAccessGuard::update_node_supported_features_repo(
+ std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo)
+{
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.update_node_supported_features_repo(features_repo);
+ });
+}
+
void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const {
for_each_stripe([&](TickableStripe& stripe) {
stripe.report_bucket_db_status(bucket_space, out);
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index 53799fa338b..c52a01fdded 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -54,6 +54,8 @@ public:
void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
void clear_read_only_bucket_repo_databases() override;
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) override;
+
void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override;
PendingOperationStats pending_operation_stats() const override;
void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h
new file mode 100644
index 00000000000..fb9cc68e970
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/node_supported_features.h
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace storage::distributor {
+
+/**
+ * Collection of distinct features supported by a particular content node.
+ *
+ * Communicated to a distributor via bucket info exchanges. All features
+ * are initially expected to be unsupported.
+ */
+struct NodeSupportedFeatures {
+ bool unordered_merge_chaining = false;
+
+ bool operator==(const NodeSupportedFeatures&) const noexcept = default;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp b/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp
new file mode 100644
index 00000000000..e125f360cec
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "node_supported_features_repo.h"
+#include <vespa/vespalib/stllike/hash_map.hpp>
+
+namespace storage::distributor {
+
+NodeSupportedFeaturesRepo::NodeSupportedFeaturesRepo() = default;
+
+NodeSupportedFeaturesRepo::NodeSupportedFeaturesRepo(
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> features,
+ PrivateCtorTag)
+ : _node_features(std::move(features))
+{}
+
+NodeSupportedFeaturesRepo::~NodeSupportedFeaturesRepo() = default;
+
+const NodeSupportedFeatures&
+NodeSupportedFeaturesRepo::node_supported_features(uint16_t node_idx) const noexcept
+{
+ static const NodeSupportedFeatures default_features;
+ const auto iter = _node_features.find(node_idx);
+ return (iter != _node_features.end() ? iter->second : default_features);
+}
+
+std::shared_ptr<const NodeSupportedFeaturesRepo>
+NodeSupportedFeaturesRepo::make_union_of(const vespalib::hash_map<uint16_t, NodeSupportedFeatures>& node_features) const
+{
+ auto new_features = _node_features; // Must be by copy.
+ // We always let the _new_ features update any existing mapping.
+ for (const auto& nf : node_features) {
+ new_features[nf.first] = nf.second;
+ }
+ return std::make_shared<NodeSupportedFeaturesRepo>(std::move(new_features), PrivateCtorTag{});
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/node_supported_features_repo.h b/storage/src/vespa/storage/distributor/node_supported_features_repo.h
new file mode 100644
index 00000000000..cc40c27b8e2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/node_supported_features_repo.h
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "node_supported_features.h"
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <memory>
+
+namespace storage::distributor {
+
+/**
+ * Repo of known mappings from node distribution key to feature set supported by
+ * the content node with the given distribution key.
+ *
+ * Entirely immutable; copy-on-write via make_union_of().
+ */
+class NodeSupportedFeaturesRepo {
+ const vespalib::hash_map<uint16_t, NodeSupportedFeatures> _node_features;
+ struct PrivateCtorTag {};
+public:
+ NodeSupportedFeaturesRepo();
+
+ NodeSupportedFeaturesRepo(vespalib::hash_map<uint16_t, NodeSupportedFeatures> features, PrivateCtorTag);
+ ~NodeSupportedFeaturesRepo();
+
+ // Returns supported node features for node with distribution key node_idx, or a default feature set
+ // with all features unset if node has no known mapping.
+ [[nodiscard]] const NodeSupportedFeatures& node_supported_features(uint16_t node_idx) const noexcept;
+
+ // Returns a new repo instance containing the union key->features set of self and node_features.
+ // If there is a duplicate mapping between the two, the features in node_features take precedence
+ // and will be stored in the new repo.
+ [[nodiscard]] std::shared_ptr<const NodeSupportedFeaturesRepo>
+ make_union_of(const vespalib::hash_map<uint16_t, NodeSupportedFeatures>& node_features) const;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index f951a880e5d..d220a71966f 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -137,9 +138,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
getBucketId(),
_limiter,
nodes);
- for (uint32_t i=0; i<nodes.size(); ++i) {
- _mnodes.push_back(api::MergeBucketCommand::Node(
- nodes[i]._nodeIndex, nodes[i]._sourceOnly));
+ for (const auto& node : nodes) {
+ _mnodes.emplace_back(node._nodeIndex, node._sourceOnly);
}
if (_mnodes.size() > 1) {
@@ -148,11 +148,16 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_mnodes,
_manager->operation_context().generate_unique_timestamp(),
clusterState.getVersion());
-
- // Due to merge forwarding/chaining semantics, we must always send
- // the merge command to the lowest indexed storage node involved in
- // the merge in order to avoid deadlocks.
- std::sort(_mnodes.begin(), _mnodes.end(), NodeIndexComparator());
+ const bool may_send_unordered = (_manager->operation_context().distributor_config().use_unordered_merge_chaining()
+ && all_involved_nodes_support_unordered_merge_chaining());
+ if (!may_send_unordered) {
+ // Due to merge forwarding/chaining semantics, we must always send
+ // the merge command to the lowest indexed storage node involved in
+ // the merge in order to avoid deadlocks.
+ std::sort(_mnodes.begin(), _mnodes.end(), NodeIndexComparator());
+ } else {
+ msg->set_use_unordered_forwarding(true);
+ }
LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(),
_mnodes[0].index);
@@ -262,7 +267,7 @@ void
MergeOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg)
{
- if (_removeOperation.get()) {
+ if (_removeOperation) {
if (_removeOperation->onReceiveInternal(msg)) {
_ok = _removeOperation->ok();
if (!_ok) {
@@ -277,7 +282,7 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender,
return;
}
- api::MergeBucketReply& reply(dynamic_cast<api::MergeBucketReply&>(*msg));
+ auto& reply = dynamic_cast<api::MergeBucketReply&>(*msg);
LOG(debug,
"Merge operation for bucket %s finished",
getBucketId().toString().c_str());
@@ -367,6 +372,16 @@ bool MergeOperation::is_global_bucket_merge() const noexcept {
return getBucket().getBucketSpace() == document::FixedBucketSpaces::global_space();
}
+bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const noexcept {
+ const auto& features_repo = _manager->operation_context().node_supported_features_repo();
+ for (uint16_t node : getNodes()) {
+ if (!features_repo.node_supported_features(node).unordered_merge_chaining) {
+ return false;
+ }
+ }
+ return true;
+}
+
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 832c0f99681..014bae842fa 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -64,6 +64,7 @@ private:
void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState,
DistributorStripeMessageSender& sender);
bool is_global_bucket_merge() const noexcept;
+ bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
MergeBucketMetricSet* get_merge_metrics();
};
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 1c1c9f4a431..8183b013668 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -9,6 +9,7 @@
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlstream.hpp>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <climits>
#include <vespa/log/bufferedlogger.h>
@@ -44,7 +45,8 @@ PendingClusterState::PendingClusterState(
_clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
_isVersionedTransition(true),
_bucketOwnershipTransfer(false),
- _pendingTransitions()
+ _pendingTransitions(),
+ _node_features()
{
logConstructionInformation();
initializeBucketSpaceTransitions(false, outdatedNodesMap);
@@ -67,7 +69,8 @@ PendingClusterState::PendingClusterState(
_clusterStateVersion(0),
_isVersionedTransition(false),
_bucketOwnershipTransfer(true),
- _pendingTransitions()
+ _pendingTransitions(),
+ _node_features()
{
logConstructionInformation();
initializeBucketSpaceTransitions(true, OutdatedNodesMap());
@@ -287,6 +290,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace);
assert(transitionIter != _pendingTransitions.end());
transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node);
+
+ update_node_supported_features_from_reply(iter->second.node, *reply);
+
_sentMessages.erase(iter);
return true;
@@ -304,21 +310,6 @@ PendingClusterState::resendDelayedMessages() {
}
}
-std::string
-PendingClusterState::requestNodesToString() const
-{
- std::ostringstream ost;
- for (uint32_t i = 0; i < _requestedNodes.size(); ++i) {
- if (_requestedNodes[i]) {
- if (ost.str().length() > 0) {
- ost << ",";
- }
- ost << i;
- }
- }
- return ost.str();
-}
-
void
PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard)
{
@@ -366,4 +357,14 @@ PendingClusterState::getPrevClusterStateBundleString() const {
return _prevClusterStateBundle.getBaselineClusterState()->toString();
}
+void
+PendingClusterState::update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply)
+{
+ const auto& src_feat = reply.supported_node_features();
+ NodeSupportedFeatures dest_feat;
+ dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining;
+ // This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space.
+ _node_features.insert(std::make_pair(node, dest_feat));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 0d07730d9ee..1a2f8901b47 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "node_supported_features.h"
#include "pending_bucket_space_db_transition_entry.h"
#include "clusterinformation.h"
#include <vespa/storage/common/storagelink.h>
@@ -9,6 +10,7 @@
#include <vespa/storageframework/generic/clock/clock.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vespalib/util/xmlserializable.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include "outdated_nodes_map.h"
#include <unordered_map>
#include <deque>
@@ -151,9 +153,14 @@ public:
// Get pending transition for a specific bucket space. Only used by unit test.
PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace);
+ // May be a subset of the nodes in the cluster, depending on how many nodes were consulted
+ // as part of the pending cluster state. Caller must take care to aggregate features.
+ const vespalib::hash_map<uint16_t, NodeSupportedFeatures>& gathered_node_supported_features() const noexcept {
+ return _node_features;
+ }
+
void printXml(vespalib::XmlOutputStream&) const override;
Summary getSummary() const;
- std::string requestNodesToString() const;
private:
// With 100ms resend timeout, this requires a particular node to have failed
@@ -170,7 +177,7 @@ private:
DistributorMessageSender& sender,
const BucketSpaceStateMap& bucket_space_states,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
- const OutdatedNodesMap &outdatedNodesMap,
+ const OutdatedNodesMap& outdatedNodesMap,
api::Timestamp creationTimestamp);
/**
@@ -213,6 +220,7 @@ private:
std::string getNewClusterStateBundleString() const;
std::string getPrevClusterStateBundleString() const;
void update_reply_failure_statistics(const api::ReturnCode& result, const BucketSpaceAndNode& source);
+ void update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply);
std::shared_ptr<api::SetSystemStateCommand> _cmd;
@@ -233,6 +241,7 @@ private:
bool _isVersionedTransition;
bool _bucketOwnershipTransfer;
std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> _node_features;
};
}
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
index bfc53c0ed82..2ed40cfcf2e 100644
--- a/storage/src/vespa/storage/distributor/stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -20,6 +20,8 @@ namespace vespalib::xml { class XmlOutputStream; }
namespace storage::distributor {
+class NodeSupportedFeaturesRepo;
+
/**
* A stripe access guard guarantees that the holder of a guard can access underlying
* stripes via it in a thread safe manner. In particular, while any access guard is
@@ -57,6 +59,8 @@ public:
virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
virtual void clear_read_only_bucket_repo_databases() = 0;
+ virtual void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) = 0;
+
struct PendingOperationStats {
size_t external_load_operations;
size_t maintenance_operations;
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
index d58b1e2e6aa..e458043ac64 100644
--- a/storage/src/vespa/storage/distributor/tickable_stripe.h
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -15,6 +15,8 @@ namespace vespalib::xml { class XmlOutputStream; }
namespace storage::distributor {
+class NodeSupportedFeaturesRepo;
+
/**
* A tickable stripe is the minimal binding glue between the stripe's worker thread and
* the actual implementation. Primarily allows for easier testing without having to
@@ -58,6 +60,8 @@ public:
virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0;
virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
virtual void clear_read_only_bucket_repo_databases() = 0;
+ virtual void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) = 0;
+
// Functions used for state reporting
virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0;
virtual StripeAccessGuard::PendingOperationStats pending_operation_stats() const = 0;
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 8fc6d7576c9..613f0f6ce09 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -7,6 +7,7 @@
#include "top_level_distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
+#include "node_supported_features_repo.h"
#include "simpleclusterinformation.h"
#include "stripe_access_guard.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
@@ -47,11 +48,12 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n
_chained_sender(chained_sender),
_outdated_nodes_map(),
_transition_timer(_node_ctx.clock()),
+ _node_supported_features_repo(std::make_shared<const NodeSupportedFeaturesRepo>()),
_stale_reads_enabled(false)
{
// FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle!
propagate_active_state_bundle_internally(true); // We're just starting up so assume ownership transfer.
- bootstrap_distribution_config(bootstrap_distribution);
+ bootstrap_distribution_config(std::move(bootstrap_distribution));
}
TopLevelBucketDBUpdater::~TopLevelBucketDBUpdater() = default;
@@ -393,6 +395,10 @@ TopLevelBucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard
guard.notify_distribution_change_enabled();
}
+ _node_supported_features_repo = _node_supported_features_repo->make_union_of(
+ _pending_cluster_state->gathered_node_supported_features());
+ guard.update_node_supported_features_repo(_node_supported_features_repo);
+
guard.update_read_snapshot_after_activation(_pending_cluster_state->getNewClusterStateBundle());
_pending_cluster_state.reset();
_outdated_nodes_map.clear();
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index f35991c20f3..b1065e708a4 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -30,6 +30,7 @@ struct BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
class ClusterStateBundleActivationListener;
class DistributorInterface;
+class NodeSupportedFeaturesRepo;
class StripeAccessor;
class StripeAccessGuard;
@@ -122,6 +123,7 @@ private:
ChainedMessageSender& _chained_sender;
OutdatedNodesMap _outdated_nodes_map;
framework::MilliSecTimer _transition_timer;
+ std::shared_ptr<const NodeSupportedFeaturesRepo> _node_supported_features_repo;
std::atomic<bool> _stale_reads_enabled;
};
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 05e50492206..07ddfd82d0b 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -113,30 +113,40 @@ MergeThrottler::MergeOperationMetrics::MergeOperationMetrics(const std::string&
}
MergeThrottler::MergeOperationMetrics::~MergeOperationMetrics() = default;
-MergeThrottler::MergeNodeSequence::MergeNodeSequence(
- const api::MergeBucketCommand& cmd,
- uint16_t thisIndex)
+MergeThrottler::MergeNodeSequence::MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex)
: _cmd(cmd),
_sortedNodes(cmd.getNodes()),
- _sortedIndex(std::numeric_limits<std::size_t>::max()),
- _thisIndex(thisIndex)
+ _sortedIndex(UINT16_MAX),
+ _unordered_index(UINT16_MAX),
+ _thisIndex(thisIndex),
+ _use_unordered_forwarding(cmd.use_unordered_forwarding())
{
// Sort the node vector so that we can find out if we're the
// last node in the chain or if we should forward the merge
std::sort(_sortedNodes.begin(), _sortedNodes.end(), NodeComparator());
- assert(!_sortedNodes.empty());
- for (std::size_t i = 0; i < _sortedNodes.size(); ++i) {
+ assert(!_sortedNodes.empty() && (_sortedNodes.size() < UINT16_MAX));
+ for (uint16_t i = 0; i < static_cast<uint16_t>(_sortedNodes.size()); ++i) {
if (_sortedNodes[i].index == _thisIndex) {
_sortedIndex = i;
break;
}
}
+ const auto& nodes = unordered_nodes();
+ for (uint16_t i = 0; i < static_cast<uint16_t>(nodes.size()); ++i) {
+ if (nodes[i].index == _thisIndex) {
+ _unordered_index = i;
+ break;
+ }
+ }
}
uint16_t
MergeThrottler::MergeNodeSequence::getNextNodeInChain() const
{
assert(_cmd.getChain().size() < _sortedNodes.size());
+ if (_use_unordered_forwarding) {
+ return unordered_nodes()[_cmd.getChain().size() + 1].index;
+ }
// assert(_sortedNodes[_cmd.getChain().size()].index == _thisIndex);
if (_sortedNodes[_cmd.getChain().size()].index != _thisIndex) {
// Some added paranoia output
@@ -153,7 +163,11 @@ MergeThrottler::MergeNodeSequence::isChainCompleted() const
{
if (_cmd.getChain().size() != _sortedNodes.size()) return false;
- for (std::size_t i = 0; i < _cmd.getChain().size(); ++i) {
+ if (_use_unordered_forwarding) {
+ return true; // Expect chain to be correct if size matches node sequence size. TODO can't we always do this?
+ }
+
+ for (size_t i = 0; i < _cmd.getChain().size(); ++i) {
if (_cmd.getChain()[i] != _sortedNodes[i].index) {
return false;
}
@@ -162,10 +176,10 @@ MergeThrottler::MergeNodeSequence::isChainCompleted() const
}
bool
-MergeThrottler::MergeNodeSequence::chainContainsIndex(uint16_t idx) const
+MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
{
- for (std::size_t i = 0; i < _cmd.getChain().size(); ++i) {
- if (_cmd.getChain()[i] == idx) {
+ for (size_t i = 0; i < _cmd.getChain().size(); ++i) {
+ if (_cmd.getChain()[i] == _thisIndex) {
return true;
}
}
@@ -358,6 +372,7 @@ MergeThrottler::forwardCommandToNode(
fwdMerge->setSourceIndex(mergeCmd.getSourceIndex());
fwdMerge->setPriority(mergeCmd.getPriority());
fwdMerge->setTimeout(mergeCmd.getTimeout());
+ fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding());
msgGuard.sendUp(fwdMerge);
}
@@ -374,7 +389,7 @@ api::StorageMessage::SP
MergeThrottler::getNextQueuedMerge()
{
if (_queue.empty()) {
- return api::StorageMessage::SP();
+ return {};
}
auto iter = _queue.begin();
@@ -385,7 +400,7 @@ MergeThrottler::getNextQueuedMerge()
}
void
-MergeThrottler::enqueueMerge(
+MergeThrottler::enqueue_merge_for_later_processing(
const api::StorageMessage::SP& msg,
MessageGuard& msgGuard)
{
@@ -395,9 +410,10 @@ MergeThrottler::enqueueMerge(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
+ // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued
const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty();
_queue.emplace(msg, _queueSequence++, is_forwarded_merge);
- _metrics->queueSize.set(_queue.size());
+ _metrics->queueSize.set(static_cast<int64_t>(_queue.size()));
}
bool
@@ -682,11 +698,29 @@ bool MergeThrottler::backpressure_mode_active() const {
return backpressure_mode_active_no_lock();
}
-bool MergeThrottler::allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept {
- // We let any merge through that has already passed through at least one other node's merge
- // window, as that has already taken up a logical resource slot on all those nodes. Busy-bouncing
- // a merge at that point would undo a great amount of thumb-twiddling and waiting.
- return (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty());
+bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept {
+ // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
+ // See comment in may_allow_into_queue() for rationale.
+ return (cmd.use_unordered_forwarding() && !cmd.getChain().empty());
+}
+
+bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
+ // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
+ // Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low
+ // limit chosen for demonstration purposes, but is entirely generalizable):
+ // 1. Node 0 receives merge M_x for nodes [0, 1], places in active window, forwards to node 1
+ // 2. Node 1 receives merge M_y for nodes [1, 0], places in active window, forwards to node 0
+ // 3. Node 0 receives merge M_y from node 1. Active window is full, so places in queue
+ // 4. Node 1 receives merge M_x from node 0. Active window is full, so places in queue
+ // 5. Neither M_x nor M_y will ever complete since they're waiting for resources that cannot be
+ // freed up before they themselves complete. Classic deadlock(tm).
+ //
+ // We do, however, allow enqueueing unordered merges that come straight from the distributor, as
+ // those cannot cause a deadlock at that point in time.
+ return (((_queue.size() < _maxQueueSize)
+ || (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty()))
+ && (!cmd.use_unordered_forwarding()
+ || (cmd.use_unordered_forwarding() && cmd.getChain().empty())));
}
// Must be run from worker thread
@@ -716,10 +750,10 @@ MergeThrottler::handleMessageDown(
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
- } else if (canProcessNewMerge()) {
+ } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) {
processNewMergeCommand(msg, msgGuard);
- } else if ((_queue.size() < _maxQueueSize) || allow_merge_with_queue_full(mergeCmd)) {
- enqueueMerge(msg, msgGuard); // Queue for later processing
+ } else if (may_allow_into_queue(mergeCmd)) {
+ enqueue_merge_for_later_processing(msg, msgGuard);
} else {
// No more room at the inn. Return BUSY so that the
// distributor will wait a bit before retrying
@@ -773,7 +807,7 @@ MergeThrottler::validateNewMerge(
<< _component.getIndex()
<< ", which is not in its forwarding chain";
LOG(error, "%s", oss.str().data());
- } else if (mergeCmd.getChain().size() >= nodeSeq.getSortedNodes().size()) {
+ } else if (mergeCmd.getChain().size() >= nodeSeq.unordered_nodes().size()) {
// Chain is full but we haven't seen the merge! This means
// the node has probably gone down with a merge it previously
// forwarded only now coming back to haunt it.
@@ -781,7 +815,7 @@ MergeThrottler::validateNewMerge(
<< " is not in node's internal state, but has a "
<< "full chain, meaning it cannot be forwarded.";
LOG(debug, "%s", oss.str().data());
- } else if (nodeSeq.chainContainsIndex(nodeSeq.getThisNodeIndex())) {
+ } else if (nodeSeq.chain_contains_this_node()) {
oss << mergeCmd.toString()
<< " is not in node's internal state, but contains "
<< "this node in its non-full chain. This should not happen!";
@@ -831,7 +865,9 @@ MergeThrottler::processNewMergeCommand(
// If chain is empty and this node is not the lowest
// index in the nodeset, immediately execute. Required for
// backwards compatibility with older distributor versions.
+ // TODO remove this
if (mergeCmd.getChain().empty()
+ && !mergeCmd.use_unordered_forwarding()
&& (nodeSeq.getSortedNodes()[0].index != _component.getIndex()))
{
LOG(debug, "%s has empty chain and was sent to node that "
@@ -1039,7 +1075,6 @@ bool
MergeThrottler::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& stateCmd)
{
-
LOG(debug,
"New cluster state arrived with version %u, flushing "
"all outdated queued merges",
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index da301172a3a..c115d36ad89 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -161,7 +161,7 @@ private:
ActiveMergeMap _merges;
MergePriorityQueue _queue;
- std::size_t _maxQueueSize;
+ size_t _maxQueueSize;
mbus::StaticThrottlePolicy::UP _throttlePolicy;
uint64_t _queueSequence; // TODO: move into a stable priority queue class
mutable std::mutex _messageLock;
@@ -220,7 +220,7 @@ public:
std::mutex& getStateLock() { return _stateLock; }
Metrics& getMetrics() { return *_metrics; }
- std::size_t getMaxQueueSize() const { return _maxQueueSize; }
+ size_t getMaxQueueSize() const { return _maxQueueSize; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
private:
@@ -230,17 +230,18 @@ private:
struct MergeNodeSequence {
const api::MergeBucketCommand& _cmd;
std::vector<api::MergeBucketCommand::Node> _sortedNodes;
- std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence
+ uint16_t _sortedIndex; // Index of current storage node in the sorted node sequence
+ uint16_t _unordered_index;
const uint16_t _thisIndex; // Index of the current storage node
+ bool _use_unordered_forwarding;
MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex);
- std::size_t getSortedIndex() const { return _sortedIndex; }
const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const {
return _sortedNodes;
}
bool isIndexUnknown() const {
- return (_sortedIndex == std::numeric_limits<std::size_t>::max());
+ return (_sortedIndex == UINT16_MAX);
}
/**
* This node is the merge executor if it's the first element in the
@@ -252,11 +253,17 @@ private:
uint16_t getExecutorNodeIndex() const{
return _cmd.getNodes()[0].index;
}
- bool isLastNode() const {
- return (_sortedIndex == _sortedNodes.size() - 1);
+ const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept {
+ return _cmd.getNodes();
}
- bool chainContainsIndex(uint16_t idx) const;
- uint16_t getThisNodeIndex() const { return _thisIndex; }
+ [[nodiscard]] bool isLastNode() const {
+ if (!_use_unordered_forwarding) {
+ return (_sortedIndex == _sortedNodes.size() - 1);
+ } else {
+ return (_unordered_index == (unordered_nodes().size() - 1));
+ }
+ }
+ [[nodiscard]] bool chain_contains_this_node() const noexcept;
/**
* Gets node to forward to in strictly increasing order.
*/
@@ -339,7 +346,7 @@ private:
* @return Highest priority waiting merge or null SP if queue is empty
*/
api::StorageMessage::SP getNextQueuedMerge();
- void enqueueMerge(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
+ void enqueue_merge_for_later_processing(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
/**
* @return true if throttle policy says at least one additional
@@ -347,12 +354,13 @@ private:
*/
bool canProcessNewMerge() const;
- bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
+ [[nodiscard]] bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
- bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
- bool backpressure_mode_active_no_lock() const;
+ [[nodiscard]] bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
+ [[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
- bool allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] static bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept;
+ [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,