diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-08-31 16:06:09 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-08-31 16:08:21 +0000 |
commit | eaea32bf295daf9ed15540f5f501aaee1a56dfe8 (patch) | |
tree | d3fa038cdd2bee56100556c241d79b88939272bf /storage | |
parent | 8cee6798d0fc29bbcf3fc7d5bb2ece08a73dc747 (diff) |
Migrate more unit tests to top-level distributor test suite
Diffstat (limited to 'storage')
5 files changed, 220 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index d34178caf8b..573362c9ef4 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -371,6 +371,25 @@ TEST_F(DistributorStripeTest, priority_config_is_propagated_to_distributor_confi EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets)); } +TEST_F(DistributorStripeTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { + setup_stripe(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); + // Force new state into being the pending state. According to the initial + // state we own the bucket, but according to the pending state, we do + // not. This must be handled correctly by the database update code. + simulate_set_pending_cluster_state("storage:10 distributor:10"); + + document::BucketId nonOwnedBucket(16, 3); + EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state()); + EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned()); + + std::vector<BucketCopy> copies; + copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); + operation_context().update_bucket_database(makeDocumentBucket(nonOwnedBucket), copies, + DatabaseUpdate::CREATE_IF_NONEXISTING); + + EXPECT_EQ("NONEXISTING", dumpBucket(nonOwnedBucket)); +} + TEST_F(DistributorStripeTest, added_db_buckets_without_gc_timestamp_implicitly_get_current_time) { setup_stripe(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); diff --git a/storage/src/tests/distributor/legacy_distributor_test.cpp b/storage/src/tests/distributor/legacy_distributor_test.cpp index 3d6b96e985d..1efe1119519 100644 --- a/storage/src/tests/distributor/legacy_distributor_test.cpp +++ b/storage/src/tests/distributor/legacy_distributor_test.cpp @@ -280,7 +280,7 @@ TEST_F(LegacyDistributorTest, operations_generated_and_started_without_duplicate // Migrated to DistributorStripeTest -// TODO STRIPE also need to impl/test cross-stripe cluster state changes +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, recovery_mode_on_cluster_state_change) { setupDistributor(Redundancy(1), NodeCount(2), "storage:1 .0.s:d distributor:1"); @@ -334,7 +334,7 @@ TEST_F(LegacyDistributorTest, handle_unknown_maintenance_reply) { } } -// TODO STRIPE -> generic, non distr/stripe test +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, contains_time_statement) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -416,8 +416,7 @@ public: } -// TODO STRIPE -> stripe test -// TODO STRIPE need to impl/test cross-stripe status requests +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, tick_processes_status_requests) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -446,8 +445,7 @@ TEST_F(LegacyDistributorTest, tick_processes_status_requests) { EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)")); } -// TODO STRIPE -> distributor test since it owns metric hook -// TODO STRIPE need to impl/test cross-stripe metrics aggregation +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, metric_update_hook_updates_pending_maintenance_metrics) { setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); // To ensure we count all operations, not just those fitting within the @@ -494,7 +492,7 @@ TEST_F(LegacyDistributorTest, metric_update_hook_updates_pending_maintenance_met } } -// TODO STRIPE -> distributor test since it uses the distributor metric update hook +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { getClock().setAbsoluteTimeInSeconds(1000); @@ -569,8 +567,8 @@ TEST_F(LegacyDistributorTest, priority_config_is_propagated_to_distributor_confi EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets)); } -// TODO STRIPE -> stripe test (that sets pending cluster state directly) -// TODO STRIPE -> distributor test (that uses top-level BucketDBUpdater::onSetSystemState) +// Migrated to DistributorStripeTest +// Explicit cluster state edge test added in TopLevelDistributorTest::cluster_state_lifecycle_is_propagated_to_stripes TEST_F(LegacyDistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { setupDistributor(Redundancy(1), NodeCount(10), "storage:2 distributor:2"); lib::ClusterState newState("storage:10 distributor:10"); @@ -1262,7 +1260,7 @@ TEST_F(LegacyDistributorTest, wanted_split_bit_count_is_lower_bounded) { EXPECT_EQ(getConfig().getMinimalBucketSplit(), 8); } -// TODO: migrate to TopLevelDistributorTest +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, host_info_sent_immediately_once_all_stripes_first_reported) { set_num_distributor_stripes(4); createLinks(); @@ -1291,7 +1289,7 @@ TEST_F(LegacyDistributorTest, host_info_sent_immediately_once_all_stripes_first_ EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); } -// TODO: migrate to TopLevelDistributorTest +// Migrated to TopLevelDistributorTest TEST_F(LegacyDistributorTest, non_bootstrap_host_info_send_request_delays_sending) { set_num_distributor_stripes(4); createLinks(); diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 8572420eaba..4968e8e6be3 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -8,7 +8,6 @@ #include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> #include <tests/distributor/top_level_distributor_test_util.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/config/config-stor-distributormanager.h> @@ -19,7 +18,6 @@ #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/distributor_stripe_pool.h> #include <vespa/storage/distributor/distributor_stripe_thread.h> -#include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/metrics/updatehook.h> #include <thread> @@ -48,6 +46,8 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { close(); } + void reply_to_1_node_bucket_info_fetch_with_n_buckets(size_t n); + // Simple type aliases to make interfacing with certain utility functions // easier. Note that this is only for readability and does not provide any // added type safety. @@ -70,9 +70,15 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return posted_msgs.str(); } - void tick_distributor_n_times(uint32_t n) { + void tick_distributor_and_stripes_n_times(uint32_t n) { + for (uint32_t i = 0; i < n; ++i) { + tick(false); + } + } + + void tick_top_level_distributor_n_times(uint32_t n) { for (uint32_t i = 0; i < n; ++i) { - tick(); + tick(true); } } @@ -91,6 +97,21 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { Distributor::MetricUpdateHook distributor_metric_update_hook() { return _distributor->_metricUpdateHook; } + + uint64_t db_sample_interval_sec() const noexcept { + // Sampling interval is equal across stripes, so just grab the first one and go with it. + return std::chrono::duration_cast<std::chrono::seconds>( + distributor_stripes().front()->db_memory_sample_interval()).count(); + } + + static std::vector<document::BucketSpace> bucket_spaces() { + return {document::FixedBucketSpaces::default_space(), document::FixedBucketSpaces::global_space()}; + } + + size_t explicit_node_state_reply_send_invocations() const noexcept { + return _node->getNodeStateUpdater().explicit_node_state_reply_send_invocations(); + } + }; TopLevelDistributorTest::TopLevelDistributorTest() @@ -236,7 +257,7 @@ TEST_F(TopLevelDistributorTest, metric_update_hook_updates_pending_maintenance_m add_nodes_to_stripe_bucket_db(document::BucketId(16, 3), "0=200/300/400/t,1=200/300/400/t"); // Go many full scanner rounds to check that metrics are set, not added to existing. - tick_distributor_n_times(50); + tick_distributor_and_stripes_n_times(50); // By this point, no hook has been called so the metrics have not been set. using MO = MaintenanceOperation; @@ -265,4 +286,150 @@ TEST_F(TopLevelDistributorTest, metric_update_hook_updates_pending_maintenance_m } } +TEST_F(TopLevelDistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { + fake_clock().setAbsoluteTimeInSeconds(1000); + + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t/a,1=2/2/2"); + tick_distributor_and_stripes_n_times(10); + + std::mutex l; + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + auto* m = total_distributor_metrics().mutable_dbs.memory_usage.getMetric("used_bytes"); + ASSERT_TRUE(m != nullptr); + auto last_used = m->getLongValue("last"); + EXPECT_GT(last_used, 0); + + // Add another bucket to the DB. This should increase the underlying used number of + // bytes, but this should not be aggregated into the metrics until the sampling time + // interval has passed. Instead, old metric gauge values should be preserved. + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2"); + + const auto sample_interval_sec = db_sample_interval_sec(); + fake_clock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet. + tick_distributor_and_stripes_n_times(50); + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + + m = total_distributor_metrics().mutable_dbs.memory_usage.getMetric("used_bytes"); + auto now_used = m->getLongValue("last"); + EXPECT_EQ(now_used, last_used); + + fake_clock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec + 1); + tick_distributor_and_stripes_n_times(10); + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + + m = total_distributor_metrics().mutable_dbs.memory_usage.getMetric("used_bytes"); + now_used = m->getLongValue("last"); + EXPECT_GT(now_used, last_used); +} + +void TopLevelDistributorTest::reply_to_1_node_bucket_info_fetch_with_n_buckets(size_t n) { + ASSERT_EQ(bucket_spaces().size(), _sender.commands().size()); + for (uint32_t i = 0; i < _sender.commands().size(); ++i) { + ASSERT_EQ(api::MessageType::REQUESTBUCKETINFO, _sender.command(i)->getType()); + auto& bucket_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.command(i)); + auto reply = bucket_req.makeReply(); + if (bucket_req.getBucketSpace() == FixedBucketSpaces::default_space()) { + auto& bucket_reply = dynamic_cast<api::RequestBucketInfoReply&>(*reply); + for (size_t j = 1; j <= n; ++j) { + bucket_reply.getBucketInfo().push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, j), api::BucketInfo(20, 10, 12, 50, 60, true, true))); + } + } + handle_top_level_message(std::move(reply)); + } + _sender.commands().clear(); +} + +TEST_F(TopLevelDistributorTest, cluster_state_lifecycle_is_propagated_to_stripes) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 .0.s:d distributor:1"); + // Node 0 goes from Down -> Up, should get 1 RequestBucketInfo per bucket space. + receive_set_system_state_command("storage:2 distributor:1"); + tick_top_level_distributor_n_times(1); // Process enqueued message + // All stripes should now be in pending state + for (auto* s : distributor_stripes()) { + for (auto space : bucket_spaces()) { + EXPECT_TRUE(s->getBucketSpaceRepo().get(space).has_pending_cluster_state()); + } + } + // Respond with some buckets that will be evenly distributed across the stripes. + reply_to_1_node_bucket_info_fetch_with_n_buckets(10); + tick_top_level_distributor_n_times(1); // Process enqueued replies + + std::vector<document::BucketId> inserted_buckets; + // Pending state should now be cleared for all stripes + for (auto* s : distributor_stripes()) { + for (auto space : bucket_spaces()) { + EXPECT_FALSE(s->getBucketSpaceRepo().get(space).has_pending_cluster_state()); + } + auto& def_space = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + def_space.getBucketDatabase().acquire_read_guard()->for_each([&](uint64_t key, [[maybe_unused]] const auto& entry) { + inserted_buckets.emplace_back(document::BucketId::keyToBucketId(key)); + }); + } + // All buckets should be present. We track as vectors rather than sets to detect any cross-stripe duplicates. + std::vector<document::BucketId> expected_buckets; + for (size_t i = 1; i <= 10; ++i) { + expected_buckets.emplace_back(16, i); + } + std::sort(expected_buckets.begin(), expected_buckets.end()); + std::sort(inserted_buckets.begin(), inserted_buckets.end()); + EXPECT_EQ(inserted_buckets, expected_buckets); +} + +TEST_F(TopLevelDistributorTest, host_info_sent_immediately_once_all_stripes_first_reported) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + ASSERT_EQ(_num_distributor_stripes, 4); + fake_clock().setAbsoluteTimeInSeconds(1000); + + tick_top_level_distributor_n_times(1); + EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); // Nothing yet + _distributor->notify_stripe_wants_to_send_host_info(1); + _distributor->notify_stripe_wants_to_send_host_info(2); + _distributor->notify_stripe_wants_to_send_host_info(3); + + tick_top_level_distributor_n_times(1); + // Still nothing. Missing initial report from stripe 0 + EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); + + _distributor->notify_stripe_wants_to_send_host_info(0); + tick_top_level_distributor_n_times(1); + // All stripes have reported in, it's time to party! + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + // No further sends if stripes haven't requested it yet. + fake_clock().setAbsoluteTimeInSeconds(2000); + tick_top_level_distributor_n_times(10); + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); +} + +TEST_F(TopLevelDistributorTest, non_bootstrap_host_info_send_request_delays_sending) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + ASSERT_EQ(_num_distributor_stripes, 4); + fake_clock().setAbsoluteTimeInSeconds(1000); + + for (uint16_t i = 0; i < 4; ++i) { + _distributor->notify_stripe_wants_to_send_host_info(i); + } + tick_top_level_distributor_n_times(1); + // Bootstrap case + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + // Stripe 1 suddenly really wants to tell the cluster controller something again + _distributor->notify_stripe_wants_to_send_host_info(1); + tick_top_level_distributor_n_times(1); + // But its cry for attention is not yet honored since the delay hasn't passed. + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + fake_clock().addMilliSecondsToTime(999); + tick_top_level_distributor_n_times(1); + // 1 sec delay has still not passed + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + + fake_clock().addMilliSecondsToTime(1); + tick_top_level_distributor_n_times(1); + // But now it has + EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index e7670a78a51..657d49bbffb 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -9,6 +9,7 @@ #include <vespa/storage/distributor/distributor_stripe_component.h> #include <vespa/storage/distributor/distributor_stripe_pool.h> #include <vespa/storage/distributor/distributor_stripe_thread.h> +#include <vespa/storage/distributor/distributor_total_metrics.h> #include <vespa/storage/common/bucket_stripe_utils.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/text/stringtokenizer.h> @@ -210,6 +211,13 @@ TopLevelDistributorTestUtil::total_ideal_state_metrics() const return *_distributor->_ideal_state_total_metrics; } +const DistributorMetricSet& +TopLevelDistributorTestUtil::total_distributor_metrics() const +{ + assert(_distributor->_total_metrics); + return *_distributor->_total_metrics; +} + const storage::distributor::DistributorNodeContext& TopLevelDistributorTestUtil::node_context() const { return _distributor->distributor_component(); @@ -221,7 +229,7 @@ TopLevelDistributorTestUtil::operation_context() { } bool -TopLevelDistributorTestUtil::tick() { +TopLevelDistributorTestUtil::tick(bool only_tick_top_level) { framework::ThreadWaitInfo res( framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN); { @@ -230,8 +238,10 @@ TopLevelDistributorTestUtil::tick() { } res.merge(_distributor->doNonCriticalTick(0)); bool did_work = !res.waitWanted(); - for (auto& s : *_stripe_pool) { - did_work |= s->stripe().tick(); + if (!only_tick_top_level) { + for (auto& s : *_stripe_pool) { + did_work |= s->stripe().tick(); + } } return did_work; } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index 0cc736464a1..9a5260bad7e 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -18,6 +18,7 @@ namespace framework { struct TickingThreadPool; } namespace distributor { class Distributor; +class DistributorMetricSet; class DistributorNodeContext; class DistributorStripe; class DistributorStripeComponent; @@ -53,16 +54,21 @@ public: BucketDBUpdater& bucket_db_updater(); const IdealStateMetricSet& total_ideal_state_metrics() const; + const DistributorMetricSet& total_distributor_metrics() const; const storage::distributor::DistributorNodeContext& node_context() const; storage::distributor::DistributorStripeOperationContext& operation_context(); std::vector<DistributorStripe*> distributor_stripes() const; - bool tick(); + bool tick(bool only_tick_top_level = false); const DistributorConfig& current_distributor_config() const; void reconfigure(const DistributorConfig&); + framework::defaultimplementation::FakeClock& fake_clock() noexcept { + return _node->getClock(); + } + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space); const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only |