diff options
Diffstat (limited to 'storage/src/tests/distributor')
10 files changed, 198 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 7348cfc328b..bee7650aebd 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST mergelimitertest.cpp mergeoperationtest.cpp multi_thread_stripe_access_guard_test.cpp + node_supported_features_repo_test.cpp nodeinfotest.cpp nodemaintenancestatstrackertest.cpp operation_sequencer_test.cpp diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index 15aada37c9b..7c97c962a97 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -100,6 +100,9 @@ struct FakeDistributorStripeOperationContext : public DistributorStripeOperation const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override { abort(); } + const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override { + abort(); + } }; struct BlockingOperationStarterTest : Test { diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 902dd6454f1..8c2ebc983fa 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -185,6 +185,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { configure_stripe(builder); } + void configure_use_unordered_merge_chaining(bool use_unordered) { + ConfigBuilder builder; + builder.useUnorderedMergeChaining = use_unordered; + configure_stripe(builder); + } + bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { return _stripe->_scheduler->implicitly_clear_priority_on_schedule(); } @@ -982,4 +988,15 @@ TEST_F(DistributorStripeTest, closing_aborts_gets_started_outside_stripe_thread) EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult()); } +TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_to_internal_config) +{ + setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_use_unordered_merge_chaining(true); + EXPECT_TRUE(getConfig().use_unordered_merge_chaining()); + + configure_use_unordered_merge_chaining(false); + EXPECT_FALSE(getConfig().use_unordered_merge_chaining()); +} + } diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index c5c51e64e68..b96b2dda1cb 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -9,8 +9,10 @@ #include <vespa/storage/distributor/distributor_stripe_component.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/ideal_state_total_metrics.h> +#include <vespa/storage/distributor/node_supported_features_repo.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/stllike/hash_map.hpp> using document::test::makeBucketSpace; using document::test::makeDocumentBucket; @@ -526,6 +528,13 @@ DistributorStripeTestUtil::db_memory_sample_interval() const noexcept { return _stripe->db_memory_sample_interval(); } +void +DistributorStripeTestUtil::set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features) { + vespalib::hash_map<uint16_t, NodeSupportedFeatures> new_features; + new_features[node] = features; + _stripe->update_node_supported_features_repo(_stripe->node_supported_features_repo().make_union_of(new_features)); +} + const lib::Distribution& DistributorStripeTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index b1e90821e3b..3226c16aba3 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -26,6 +26,7 @@ class DocumentSelectionParser; class ExternalOperationHandler; class IdealStateManager; class IdealStateMetricSet; +class NodeSupportedFeatures; class Operation; class StripeBucketDBUpdater; @@ -150,6 +151,7 @@ public: [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; + void set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features); const lib::Distribution& getDistribution() const; diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 65ee5254193..54bd06c98e0 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/dummystoragelink.h> + #include <tests/distributor/distributor_stripe_test_util.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/test/make_document_bucket.h> @@ -12,6 +12,7 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <charconv> using document::test::makeDocumentBucket; using document::test::makeBucketSpace; @@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil { } std::shared_ptr<MergeOperation> setup_minimal_merge_op(); + std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes); std::shared_ptr<MergeOperation> setup_simple_merge_op(); void assert_simple_merge_bucket_command(); void assert_simple_delete_bucket_command(); @@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation> MergeOperationTest::setup_minimal_merge_op() { document::BucketId bucket_id(16, 1); - auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2))); + auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2})); op->setIdealStateManager(&getIdealStateManager()); return op; } std::shared_ptr<MergeOperation> -MergeOperationTest::setup_simple_merge_op() +MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes) { getClock().setAbsoluteTimeInSeconds(10); @@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op() enable_cluster_state("distributor:1 storage:3"); - auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes)); op->setIdealStateManager(&getIdealStateManager()); op->start(_sender, framework::MilliSecTime(0)); return op; } +std::shared_ptr<MergeOperation> +MergeOperationTest::setup_simple_merge_op() +{ + return setup_simple_merge_op({0, 1, 2}); +} + void MergeOperationTest::assert_simple_merge_bucket_command() { @@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis num.erase(pos); trusted = true; } - bucketDB[i] = BucketCopy(0, atoi(num.c_str()), - api::BucketInfo(1, 2, 3)); + uint16_t node; + [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node); + assert(ec == std::errc{}); + bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3)); bucketDB[i].setTrusted(trusted); } std::vector<MergeMetaData> nodes(st.size()); @@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics) EXPECT_EQ(1, metrics->throttled.getValue()); } +TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) { + setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4"); + NodeSupportedFeatures with_unordered; + with_unordered.unordered_merge_chaining = true; + + set_node_supported_features(1, with_unordered); + set_node_supported_features(2, with_unordered); + + auto config = make_config(); + config->set_use_unordered_merge_chaining(true); + configure_stripe(std::move(config)); + + // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1). + setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " + "cluster state version: 0, nodes: [2, 1, 3], chain: [], " + "reasons to start: ) => 1", + _sender.getLastCommand(true)); + + // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2) + setup_simple_merge_op({1, 2}); + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, " + "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), " + "reasons to start: ) => 2", + _sender.getLastCommand(true)); + + _sender.clear(); + + config = make_config(); + config->set_use_unordered_merge_chaining(false); + configure_stripe(std::move(config)); + + // If config is not enabled, should send ordered even if nodes support the feature. + setup_simple_merge_op({2, 1}); + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, " + "cluster state version: 0, nodes: [2, 1], chain: [], " + "reasons to start: ) => 1", + _sender.getLastCommand(true)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h index 38fc0c599a2..ec2f978c029 100644 --- a/storage/src/tests/distributor/mock_tickable_stripe.h +++ b/storage/src/tests/distributor/mock_tickable_stripe.h @@ -33,6 +33,10 @@ struct MockTickableStripe : TickableStripe { void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); } void clear_read_only_bucket_repo_databases() override { abort(); } + void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo>) override { + abort(); + } + void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); } StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); } void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); } diff --git a/storage/src/tests/distributor/node_supported_features_repo_test.cpp b/storage/src/tests/distributor/node_supported_features_repo_test.cpp new file mode 100644 index 00000000000..990e0fc50a3 --- /dev/null +++ b/storage/src/tests/distributor/node_supported_features_repo_test.cpp @@ -0,0 +1,52 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/distributor/node_supported_features_repo.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; + +namespace storage::distributor { + +struct NodeSupportedFeaturesRepoTest : Test { + using FeatureMap = vespalib::hash_map<uint16_t, NodeSupportedFeatures>; + NodeSupportedFeaturesRepo _repo; + + static NodeSupportedFeatures set_features() noexcept { + NodeSupportedFeatures f; + f.unordered_merge_chaining = true; + return f; + } + + static NodeSupportedFeatures unset_features() noexcept { + return {}; + } +}; + +TEST_F(NodeSupportedFeaturesRepoTest, feature_set_is_empty_by_default) { + EXPECT_EQ(_repo.node_supported_features(0), unset_features()); + EXPECT_EQ(_repo.node_supported_features(12345), unset_features()); +} + +TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_can_add_new_feature_mapping) { + FeatureMap fm; + fm[1] = set_features(); + fm[60] = set_features(); + auto new_repo = _repo.make_union_of(fm); + EXPECT_EQ(new_repo->node_supported_features(0), unset_features()); + EXPECT_EQ(new_repo->node_supported_features(1), set_features()); + EXPECT_EQ(new_repo->node_supported_features(60), set_features()); +} + +TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_updates_existing_feature_mappings) { + FeatureMap fm; + fm[1] = set_features(); + fm[60] = set_features(); + auto new_repo = _repo.make_union_of(fm); + fm[1] = unset_features(); + new_repo = new_repo->make_union_of(fm); + EXPECT_EQ(new_repo->node_supported_features(1), unset_features()); + EXPECT_EQ(new_repo->node_supported_features(60), set_features()); +} + +} diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index fe8a607c9ae..3ed5e9f4a8d 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/top_level_bucket_db_updater.h> #include <vespa/storage/distributor/bucket_space_distribution_context.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/node_supported_features_repo.h> #include <vespa/storage/distributor/pending_bucket_space_db_transition.h> #include <vespa/storage/distributor/outdated_nodes_map.h> #include <vespa/storage/storageutil/distributorstatecache.h> @@ -119,6 +120,21 @@ public: invalid_bucket_count)); } + void fake_bucket_reply(const lib::ClusterState &state, + const api::StorageCommand &cmd, + uint32_t bucket_count, + const std::function<void(api::RequestBucketInfoReply&)>& reply_decorator) + { + ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO); + const api::StorageMessageAddress& address(*cmd.getAddress()); + auto reply = make_fake_bucket_reply(state, + dynamic_cast<const RequestBucketInfoCommand &>(cmd), + address.getIndex(), + bucket_count, 0); + reply_decorator(*reply); + bucket_db_updater().onRequestBucketInfoReply(reply); + } + void send_fake_reply_for_single_bucket_request( const api::RequestBucketInfoCommand& rbi) { @@ -232,7 +248,7 @@ public: } } - api::StorageMessageAddress storage_address(uint16_t node) { + static api::StorageMessageAddress storage_address(uint16_t node) { static vespalib::string _storage("storage"); return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node); } @@ -1299,7 +1315,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); for (uint32_t i = 0; i < 3; ++i) { - nodes.push_back(api::MergeBucketCommand::Node(i)); + nodes.emplace_back(i); } api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); @@ -2662,4 +2678,37 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl EXPECT_FALSE(def_rs.is_routable()); } +TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) { + lib::ClusterState state("distributor:1 storage:3"); + set_cluster_state(state); + uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1; + + // Known feature sets are initially empty. + auto stripes = distributor_stripes(); + for (auto* s : stripes) { + for (uint16_t i : {0, 1, 2}) { + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining); + } + } + + ASSERT_EQ(expected_msgs, _sender.commands().size()); + for (uint32_t i = 0; i < _sender.commands().size(); i++) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), + dummy_buckets_to_return, [i](auto& reply) noexcept { + // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported. + // Node 0 does not support the fanciness. + if (i > 0) { + reply.supported_node_features().unordered_merge_chaining = true; + } + })); + } + + // Node features should be propagated to all stripes + for (auto* s : stripes) { + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining); + } +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 636a09d1f6e..2a61141865a 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -115,13 +115,13 @@ TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api: void TopLevelDistributorTestUtil::close() { - _component.reset(0); - if (_distributor.get()) { + _component.reset(); + if (_distributor) { _stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose _distributor->onClose(); } _sender.clear(); - _node.reset(0); + _node.reset(); _config = getStandardConfig(false); } |