diff options
Diffstat (limited to 'storage/src/tests/distributor')
8 files changed, 82 insertions, 49 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 97fccf58901..7e8fec3b83a 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); _sender.clear(); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); complete_recovery_mode(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); std::string distConfig(getDistConfig6Nodes4Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); // No replies received yet, still no recovery mode. - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); ASSERT_EQ(messageCount(6), _sender.commands().size()); uint32_t numBuckets = 10; @@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) // Pending cluster state (i.e. distribution) has been enabled, which should // cause recovery mode to be entered. - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); complete_recovery_mode(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); } namespace { @@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until "version:2 distributor:1 storage:4", n_buckets, 4)); // Version should not be switched over yet - EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion()); EXPECT_EQ(uint64_t(0), mutable_default_db().size()); EXPECT_EQ(uint64_t(0), mutable_global_db().size()); EXPECT_FALSE(activate_cluster_state_version(2)); - EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion()); EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size()); EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 7958306db5f..2944348e639 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -175,6 +175,10 @@ struct DistributorTest : Test, DistributorTestUtil { return _distributor->handleMessage(msg); } + uint64_t db_sample_interval_sec() const noexcept { + return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count(); + } + void configure_stale_reads_enabled(bool enabled) { ConfigBuilder builder; builder.allowStaleReadsDuringClusterStateTransitions = enabled; @@ -280,19 +284,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) { "storage:1 .0.s:d distributor:1"); enableDistributorClusterState("storage:1 distributor:1"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); for (uint32_t i = 0; i < 3; ++i) { addNodesToBucketDB(document::BucketId(16, i), "0=1"); } for (int i = 0; i < 3; ++i) { tick(); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); } tick(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); enableDistributorClusterState("storage:2 distributor:1"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); } // TODO -> stripe test @@ -489,14 +493,6 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) } } -namespace { - -uint64_t db_sample_interval_sec(const Distributor& d) noexcept { - return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count(); -} - -} - // TODO -> stripe test TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { getClock().setAbsoluteTimeInSeconds(1000); @@ -517,7 +513,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim // interval has passed. Instead, old metric gauge values should be preserved. addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2"); - const auto sample_interval_sec = db_sample_interval_sec(getDistributor()); + const auto sample_interval_sec = db_sample_interval_sec(); getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet. tickDistributorNTimes(50); distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); @@ -925,7 +921,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes reply->setResult(api::ReturnCode(api::ReturnCode::BUSY)); _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply))); - auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo(); + auto& node_info = pending_message_tracker().getNodeInfo(); EXPECT_TRUE(node_info.isBusy(0)); getClock().addSecondsToTime(99); @@ -1045,7 +1041,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) { tickDistributorNTimes(5); // 1/3rds into second round through database enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); // Bucket space stats should now be invalid per space per node, pending stats // from state version 2. Exposing stats from version 1 risks reporting stale // information back to the cluster controller. @@ -1066,13 +1062,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(1); // DB round not yet complete EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); // Now out of recovery mode, subsequent round completions should not send replies tickDistributorNTimes(10); EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); @@ -1080,12 +1076,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) { setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); // 2 buckets with missing replicas triggering merge pending stats addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a"); tickDistributorNTimes(3); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); const auto space_name = FixedBucketSpaces::to_string(space); assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats()); // First completed scan sends off merge stats et al to cluster controller @@ -1214,7 +1210,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) { set_up_and_start_get_op_with_stale_reads_enabled(true); Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1)); - EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage( + EXPECT_FALSE(pending_message_tracker().hasPendingMessage( 0, bucket, api::MessageType::GET_ID)); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index bdd953b6206..5d204693971 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -425,6 +425,36 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const { return _distributor->getReadOnlyBucketSpaceRepo(); } +bool +DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept { + return _distributor->isInRecoveryMode(); +} + +const lib::ClusterStateBundle& +DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept { + return getDistributor().getClusterStateBundle(); +} + +std::string +DistributorTestUtil::active_ideal_state_operations() const { + return _distributor->getActiveIdealStateOperations(); +} + +const PendingMessageTracker& +DistributorTestUtil::pending_message_tracker() const noexcept { + return _distributor->getPendingMessageTracker(); +} + +PendingMessageTracker& +DistributorTestUtil::pending_message_tracker() noexcept { + return _distributor->getPendingMessageTracker(); +} + +std::chrono::steady_clock::duration +DistributorTestUtil::db_memory_sample_interval() const noexcept { + return _distributor->db_memory_sample_interval(); +} + const lib::Distribution& DistributorTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); @@ -453,4 +483,9 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); } +void +DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) { + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index b845456e873..0ed2498a0a2 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -118,9 +118,8 @@ public: storage::distributor::DistributorStripeComponent& distributor_component(); storage::distributor::DistributorStripeOperationContext& operation_context(); - Distributor& getDistributor() { - return *_distributor; - } + Distributor& getDistributor() noexcept { return *_distributor; } + const Distributor& getDistributor() const noexcept { return *_distributor; } bool tick(); @@ -140,6 +139,12 @@ public: const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; + [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept; + [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept; + [[nodiscard]] std::string active_ideal_state_operations() const; + [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; + [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; + [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket @@ -190,6 +195,8 @@ public: DistributorMessageSenderStub& sender() noexcept { return _sender; } const DistributorMessageSenderStub& sender() const noexcept { return _sender; } + + void setSystemState(const lib::ClusterState& systemState); protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index ce9aa0a6800..0a36e5cd0e5 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -38,10 +38,6 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { close(); } - void setSystemState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); - } - bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, const PendingMessageTracker& tracker, @@ -120,7 +116,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) { ost.str()); tick(); - EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); + EXPECT_EQ("", active_ideal_state_operations()); } @@ -143,13 +139,12 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) { EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d")); - EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); - EXPECT_EQ(0, _distributor->getPendingMessageTracker() - .getNodeInfo().getPendingCount(0)); + EXPECT_EQ("", active_ideal_state_operations()); + EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0)); } TEST_F(IdealStateManagerTest, recheck_when_active) { @@ -162,17 +157,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) { tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); } TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 02491b670c6..478f20796d0 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { createLinks(); setupDistributor(1, 1, "version:1 distributor:1 storage:1"); _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); - _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker()); + _sender.setPendingMessageTracker(pending_message_tracker()); addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); } @@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) { return std::make_shared<ReadForWriteVisitorOperationStarter>( std::move(visitor_op), operation_sequencer(), - *_op_owner, getDistributor().getPendingMessageTracker(), + *_op_owner, pending_message_tracker(), _mock_uuid_generator); } }; @@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend std::move(nodes), api::Timestamp(123456)); merge->setAddress(make_storage_address(0)); - getDistributor().getPendingMessageTracker().insert(merge); + pending_message_tracker().insert(merge); _op_owner->start(op, OperationStarter::Priority(120)); ASSERT_EQ("", _sender.getCommands(true)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " @@ -157,13 +157,13 @@ struct ConcurrentMutationFixture { _mutation = _test.sender().command(0); // Since pending message tracking normally happens in the distributor itself during sendUp, // we have to emulate this and explicitly insert the sent message into the pending mapping. - _test.getDistributor().getPendingMessageTracker().insert(_mutation); + _test.pending_message_tracker().insert(_mutation); } void unblock_bucket() { // Pretend update operation completed auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply()); - _test.getDistributor().getPendingMessageTracker().reply(*update_reply); + _test.pending_message_tracker().reply(*update_reply); _test._op_owner->handleReply(update_reply); } }; diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 9b49f1347cc..8310e4e38e0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { }; void enableClusterState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); + setSystemState(systemState); } void insertJoinableBuckets(); diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index ccbb64e8970..f9a4e6dbe0f 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) { TEST_F(VisitorOperationTest, visit_ideal_node) { ClusterState state("distributor:1 storage:3"); - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state)); + enable_distributor_cluster_state(lib::ClusterStateBundle(state)); // Create buckets in bucketdb for (int i=0; i<32; i++ ) { |