diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-05-03 11:09:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-03 11:09:02 +0200 |
commit | 30a495955ab74ccd37533f50f2b67d97c148d48f (patch) | |
tree | 4e8fa3964be427fc358d3b3ecf185ec2fb9b76cc | |
parent | 8fb20fde4661ab73946c338e3b4b3209d2e79b73 (diff) |
Revert "Make more Distributor internals only available to friended tests"
10 files changed, 86 insertions, 138 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 7e8fec3b83a..97fccf58901 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); _sender.clear(); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); complete_recovery_mode(); - EXPECT_FALSE(distributor_is_in_recovery_mode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); std::string distConfig(getDistConfig6Nodes4Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); // No replies received yet, still no recovery mode. - EXPECT_FALSE(distributor_is_in_recovery_mode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); ASSERT_EQ(messageCount(6), _sender.commands().size()); uint32_t numBuckets = 10; @@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) // Pending cluster state (i.e. distribution) has been enabled, which should // cause recovery mode to be entered. - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); complete_recovery_mode(); - EXPECT_FALSE(distributor_is_in_recovery_mode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); } namespace { @@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until "version:2 distributor:1 storage:4", n_buckets, 4)); // Version should not be switched over yet - EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion()); + EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); EXPECT_EQ(uint64_t(0), mutable_default_db().size()); EXPECT_EQ(uint64_t(0), mutable_global_db().size()); EXPECT_FALSE(activate_cluster_state_version(2)); - EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion()); + EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size()); EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 2944348e639..7958306db5f 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -175,10 +175,6 @@ struct DistributorTest : Test, DistributorTestUtil { return _distributor->handleMessage(msg); } - uint64_t db_sample_interval_sec() const noexcept { - return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count(); - } - void configure_stale_reads_enabled(bool enabled) { ConfigBuilder builder; builder.allowStaleReadsDuringClusterStateTransitions = enabled; @@ -284,19 +280,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) { "storage:1 .0.s:d distributor:1"); enableDistributorClusterState("storage:1 distributor:1"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); for (uint32_t i = 0; i < 3; ++i) { addNodesToBucketDB(document::BucketId(16, i), "0=1"); } for (int i = 0; i < 3; ++i) { tick(); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); } tick(); - EXPECT_FALSE(distributor_is_in_recovery_mode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); enableDistributorClusterState("storage:2 distributor:1"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); } // TODO -> stripe test @@ -493,6 +489,14 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) } } +namespace { + +uint64_t db_sample_interval_sec(const Distributor& d) noexcept { + return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count(); +} + +} + // TODO -> stripe test TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { getClock().setAbsoluteTimeInSeconds(1000); @@ -513,7 +517,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim // interval has passed. Instead, old metric gauge values should be preserved. addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2"); - const auto sample_interval_sec = db_sample_interval_sec(); + const auto sample_interval_sec = db_sample_interval_sec(getDistributor()); getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet. tickDistributorNTimes(50); distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); @@ -921,7 +925,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes reply->setResult(api::ReturnCode(api::ReturnCode::BUSY)); _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply))); - auto& node_info = pending_message_tracker().getNodeInfo(); + auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo(); EXPECT_TRUE(node_info.isBusy(0)); getClock().addSecondsToTime(99); @@ -1041,7 +1045,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) { tickDistributorNTimes(5); // 1/3rds into second round through database enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); // Bucket space stats should now be invalid per space per node, pending stats // from state version 2. Exposing stats from version 1 risks reporting stale // information back to the cluster controller. @@ -1062,13 +1066,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(1); // DB round not yet complete EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - EXPECT_FALSE(distributor_is_in_recovery_mode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); // Now out of recovery mode, subsequent round completions should not send replies tickDistributorNTimes(10); EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); @@ -1076,12 +1080,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) { setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - EXPECT_TRUE(distributor_is_in_recovery_mode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); // 2 buckets with missing replicas triggering merge pending stats addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a"); tickDistributorNTimes(3); - EXPECT_FALSE(distributor_is_in_recovery_mode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); const auto space_name = FixedBucketSpaces::to_string(space); assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats()); // First completed scan sends off merge stats et al to cluster controller @@ -1210,7 +1214,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) { set_up_and_start_get_op_with_stale_reads_enabled(true); Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1)); - EXPECT_FALSE(pending_message_tracker().hasPendingMessage( + EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage( 0, bucket, api::MessageType::GET_ID)); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 5d204693971..bdd953b6206 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -425,36 +425,6 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const { return _distributor->getReadOnlyBucketSpaceRepo(); } -bool -DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept { - return _distributor->isInRecoveryMode(); -} - -const lib::ClusterStateBundle& -DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept { - return getDistributor().getClusterStateBundle(); -} - -std::string -DistributorTestUtil::active_ideal_state_operations() const { - return _distributor->getActiveIdealStateOperations(); -} - -const PendingMessageTracker& -DistributorTestUtil::pending_message_tracker() const noexcept { - return _distributor->getPendingMessageTracker(); -} - -PendingMessageTracker& -DistributorTestUtil::pending_message_tracker() noexcept { - return _distributor->getPendingMessageTracker(); -} - -std::chrono::steady_clock::duration -DistributorTestUtil::db_memory_sample_interval() const noexcept { - return _distributor->db_memory_sample_interval(); -} - const lib::Distribution& DistributorTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); @@ -483,9 +453,4 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); } -void -DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); -} - } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 0ed2498a0a2..b845456e873 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -118,8 +118,9 @@ public: storage::distributor::DistributorStripeComponent& distributor_component(); storage::distributor::DistributorStripeOperationContext& operation_context(); - Distributor& getDistributor() noexcept { return *_distributor; } - const Distributor& getDistributor() const noexcept { return *_distributor; } + Distributor& getDistributor() { + return *_distributor; + } bool tick(); @@ -139,12 +140,6 @@ public: const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; - [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept; - [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept; - [[nodiscard]] std::string active_ideal_state_operations() const; - [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; - [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; - [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket @@ -195,8 +190,6 @@ public: DistributorMessageSenderStub& sender() noexcept { return _sender; } const DistributorMessageSenderStub& sender() const noexcept { return _sender; } - - void setSystemState(const lib::ClusterState& systemState); protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 0a36e5cd0e5..ce9aa0a6800 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -38,6 +38,10 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { close(); } + void setSystemState(const lib::ClusterState& systemState) { + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); + } + bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, const PendingMessageTracker& tracker, @@ -116,7 +120,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) { ost.str()); tick(); - EXPECT_EQ("", active_ideal_state_operations()); + EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); } @@ -139,12 +143,13 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) { EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n", - active_ideal_state_operations()); + _distributor->getActiveIdealStateOperations()); setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d")); - EXPECT_EQ("", active_ideal_state_operations()); - EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0)); + EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); + EXPECT_EQ(0, _distributor->getPendingMessageTracker() + .getNodeInfo().getPendingCount(0)); } TEST_F(IdealStateManagerTest, recheck_when_active) { @@ -157,17 +162,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) { tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - active_ideal_state_operations()); + _distributor->getActiveIdealStateOperations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - active_ideal_state_operations()); + _distributor->getActiveIdealStateOperations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - active_ideal_state_operations()); + _distributor->getActiveIdealStateOperations()); } TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 478f20796d0..02491b670c6 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { createLinks(); setupDistributor(1, 1, "version:1 distributor:1 storage:1"); _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); - _sender.setPendingMessageTracker(pending_message_tracker()); + _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker()); addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); } @@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) { return std::make_shared<ReadForWriteVisitorOperationStarter>( std::move(visitor_op), operation_sequencer(), - *_op_owner, pending_message_tracker(), + *_op_owner, getDistributor().getPendingMessageTracker(), _mock_uuid_generator); } }; @@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend std::move(nodes), api::Timestamp(123456)); merge->setAddress(make_storage_address(0)); - pending_message_tracker().insert(merge); + getDistributor().getPendingMessageTracker().insert(merge); _op_owner->start(op, OperationStarter::Priority(120)); ASSERT_EQ("", _sender.getCommands(true)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " @@ -157,13 +157,13 @@ struct ConcurrentMutationFixture { _mutation = _test.sender().command(0); // Since pending message tracking normally happens in the distributor itself during sendUp, // we have to emulate this and explicitly insert the sent message into the pending mapping. - _test.pending_message_tracker().insert(_mutation); + _test.getDistributor().getPendingMessageTracker().insert(_mutation); } void unblock_bucket() { // Pretend update operation completed auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply()); - _test.pending_message_tracker().reply(*update_reply); + _test.getDistributor().getPendingMessageTracker().reply(*update_reply); _test._op_owner->handleReply(update_reply); } }; diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 8310e4e38e0..9b49f1347cc 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { }; void enableClusterState(const lib::ClusterState& systemState) { - setSystemState(systemState); + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); } void insertJoinableBuckets(); diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index f9a4e6dbe0f..ccbb64e8970 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) { TEST_F(VisitorOperationTest, visit_ideal_node) { ClusterState state("distributor:1 storage:3"); - enable_distributor_cluster_state(lib::ClusterStateBundle(state)); + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state)); // Create buckets in bucketdb for (int i=0; i<32; i++ ) { diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 04d8560298a..bdf3f30fdc9 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -54,9 +54,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _comp_reg(compReg), _metrics(std::make_shared<DistributorMetricSet>()), _messageSender(messageSender), - _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, - doneInitHandler, *this, _use_legacy_mode)), + doneInitHandler, *this, (num_distributor_stripes == 0))), _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)), _component(*this, compReg, "distributor"), _total_config(_component.total_distributor_config_sp()), @@ -72,7 +71,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, { _component.registerMetric(*_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); - if (!_use_legacy_mode) { + if (num_distributor_stripes > 0) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, @@ -91,54 +90,43 @@ Distributor::~Distributor() closeNextLink(); } -// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists. -// All functions below that assert on _use_legacy_mode are only currently used by tests - bool Distributor::isInRecoveryMode() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->isInRecoveryMode(); } const PendingMessageTracker& Distributor::getPendingMessageTracker() const { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getPendingMessageTracker(); } PendingMessageTracker& Distributor::getPendingMessageTracker() { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getPendingMessageTracker(); } DistributorBucketSpaceRepo& Distributor::getBucketSpaceRepo() noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getBucketSpaceRepo(); } const DistributorBucketSpaceRepo& Distributor::getBucketSpaceRepo() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getBucketSpaceRepo(); } DistributorBucketSpaceRepo& Distributor::getReadOnlyBucketSpaceRepo() noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getReadOnlyBucketSpaceRepo(); } const DistributorBucketSpaceRepo& Distributor::getReadyOnlyBucketSpaceRepo() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getReadOnlyBucketSpaceRepo();; } storage::distributor::DistributorStripeComponent& Distributor::distributor_component() noexcept { - assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE We need to grab the stripe's component since tests like to access // these things uncomfortably directly. return _stripe->_component; @@ -146,55 +134,46 @@ Distributor::distributor_component() noexcept { StripeBucketDBUpdater& Distributor::bucket_db_updater() { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->bucket_db_updater(); } const StripeBucketDBUpdater& Distributor::bucket_db_updater() const { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->bucket_db_updater(); } IdealStateManager& Distributor::ideal_state_manager() { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->ideal_state_manager(); } const IdealStateManager& Distributor::ideal_state_manager() const { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->ideal_state_manager(); } ExternalOperationHandler& Distributor::external_operation_handler() { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->external_operation_handler(); } const ExternalOperationHandler& Distributor::external_operation_handler() const { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->external_operation_handler(); } BucketDBMetricUpdater& Distributor::bucket_db_metric_updater() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->_bucketDBMetricUpdater; } const DistributorConfiguration& Distributor::getConfig() const { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->getConfig(); } std::chrono::steady_clock::duration Distributor::db_memory_sample_interval() const noexcept { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->db_memory_sample_interval(); } @@ -275,7 +254,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread // regardless of what RPC thread (comm mgr, FRT...) this is called from! - if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) { + if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) { return msg->callHandler(*_bucket_db_updater, msg); } // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? @@ -286,7 +265,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) bool Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) { - if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) { + if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) { return reply->callHandler(*_bucket_db_updater, reply); } return _stripe->handleReply(reply); @@ -296,7 +275,6 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { - assert(_use_legacy_mode); // TODO STRIPE return _stripe->handleMessage(msg); } @@ -321,7 +299,6 @@ Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply) const lib::ClusterStateBundle& Distributor::getClusterStateBundle() const { - assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE must offer a single unifying state across stripes return _stripe->getClusterStateBundle(); } @@ -329,7 +306,6 @@ Distributor::getClusterStateBundle() const void Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) { - assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE make test injection/force-function _stripe->enableClusterStateBundle(state); } @@ -337,7 +313,7 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) void Distributor::storageDistributionChanged() { - if (!_use_legacy_mode) { + if (_bucket_db_updater) { if (!_distribution || (*_component.getDistribution() != *_distribution)) { LOG(debug, "Distribution changed to %s, must re-fetch bucket information", _component.getDistribution()->toString().c_str()); @@ -356,7 +332,7 @@ Distributor::storageDistributionChanged() void Distributor::enableNextDistribution() { - if (!_use_legacy_mode) { + if (_bucket_db_updater) { if (_next_distribution) { _distribution = _next_distribution; _next_distribution = std::shared_ptr<lib::Distribution>(); @@ -374,7 +350,7 @@ void Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { - // TODO STRIPE cannot directly access stripe when not in legacy mode! + // TODO STRIPE top-level bucket DB updater _stripe->propagateDefaultDistribution(std::move(distribution)); } @@ -407,7 +383,6 @@ Distributor::propagateInternalScanMetricsToExternal() void Distributor::scanAllBuckets() { - assert(_use_legacy_mode); // TODO STRIPE _stripe->scanAllBuckets(); } @@ -415,12 +390,11 @@ framework::ThreadWaitInfo Distributor::doCriticalTick(framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (!_use_legacy_mode) { + if (_bucket_db_updater) { enableNextDistribution(); } // Propagates any new configs down to stripe(s) enableNextConfig(); - // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise _stripe->doCriticalTick(idx); _tickResult.merge(_stripe->_tickResult); return _tickResult; @@ -429,7 +403,7 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) framework::ThreadWaitInfo Distributor::doNonCriticalTick(framework::ThreadIndex idx) { - if (!_use_legacy_mode) { + if (_bucket_db_updater) { _bucket_db_updater->resend_delayed_messages(); } // TODO STRIPE stripes need their own thread loops! @@ -443,7 +417,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c { // Only lazily trigger a config propagation and internal update if something has _actually changed_. if (_component.internal_config_generation() != _current_internal_config_generation) { - if (!_use_legacy_mode) { + if (_bucket_db_updater) { _total_config = _component.total_distributor_config_sp(); auto guard = _stripe_accessor->rendezvous_and_hold_all(); guard->update_total_distributor_config(_component.total_distributor_config_sp()); @@ -453,7 +427,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _current_internal_config_generation = _component.internal_config_generation(); } - if (_use_legacy_mode) { + if (!_bucket_db_updater) { // TODO STRIPE remove these once tests are fixed to trigger reconfig properly _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 0420f1b1f22..71d71290bb5 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -83,6 +83,12 @@ public: int getDistributorIndex() const override { return _component.node_index(); } const ClusterContext& cluster_context() const override { return _component.cluster_context(); } + /** + * Enables a new cluster state. Called after the bucket db updater has + * retrieved all bucket info related to the change. + */ + void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); + void storageDistributionChanged() override; bool handleReply(const std::shared_ptr<api::StorageReply>& reply); @@ -93,9 +99,26 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; + std::string getActiveIdealStateOperations() const; + virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; + const lib::ClusterStateBundle& getClusterStateBundle() const; + const DistributorConfiguration& getConfig() const; + + bool isInRecoveryMode() const noexcept; + + PendingMessageTracker& getPendingMessageTracker(); + const PendingMessageTracker& getPendingMessageTracker() const; + + DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; + + storage::distributor::DistributorStripeComponent& distributor_component() noexcept; + class MetricUpdateHook : public framework::MetricUpdateHook { public: @@ -112,6 +135,8 @@ public: Distributor& _self; }; + std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; + private: friend struct DistributorTest; friend class BucketDBUpdaterTest; @@ -121,31 +146,14 @@ private: void setNodeStateUp(); bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); - /** - * Enables a new cluster state. Used by tests to bypass BucketDBUpdater. - */ - void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); - // Accessors used by tests - std::string getActiveIdealStateOperations() const; - const lib::ClusterStateBundle& getClusterStateBundle() const; - const DistributorConfiguration& getConfig() const; - bool isInRecoveryMode() const noexcept; - PendingMessageTracker& getPendingMessageTracker(); - const PendingMessageTracker& getPendingMessageTracker() const; - DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; - DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; - storage::distributor::DistributorStripeComponent& distributor_component() noexcept; - std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; - StripeBucketDBUpdater& bucket_db_updater(); const StripeBucketDBUpdater& bucket_db_updater() const; IdealStateManager& ideal_state_manager(); const IdealStateManager& ideal_state_manager() const; ExternalOperationHandler& external_operation_handler(); const ExternalOperationHandler& external_operation_handler() const; + BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept; /** @@ -169,7 +177,6 @@ private: DistributorComponentRegister& _comp_reg; std::shared_ptr<DistributorMetricSet> _metrics; ChainedMessageSender* _messageSender; - const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. std::unique_ptr<DistributorStripe> _stripe; std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor; |