diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-04-30 13:45:59 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-03 10:47:26 +0000 |
commit | 6717a278312b4a85b83afcfb52276fe3c1e54da6 (patch) | |
tree | 5df4ccf27e6e043a7cfcc6446928e003f607f975 /storage | |
parent | 595bbf9b21eaa2d0065d4c6c01cc0f7d5f962c8b (diff) |
Make more Distributor internals only available to friended tests
Also add more assertions that such functions are only called in legacy mode.
Diffstat (limited to 'storage')
10 files changed, 138 insertions, 86 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 97fccf58901..7e8fec3b83a 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); _sender.clear(); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); complete_recovery_mode(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); std::string distConfig(getDistConfig6Nodes4Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); // No replies received yet, still no recovery mode. - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); ASSERT_EQ(messageCount(6), _sender.commands().size()); uint32_t numBuckets = 10; @@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) // Pending cluster state (i.e. distribution) has been enabled, which should // cause recovery mode to be entered. - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); complete_recovery_mode(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); } namespace { @@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until "version:2 distributor:1 storage:4", n_buckets, 4)); // Version should not be switched over yet - EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion()); EXPECT_EQ(uint64_t(0), mutable_default_db().size()); EXPECT_EQ(uint64_t(0), mutable_global_db().size()); EXPECT_FALSE(activate_cluster_state_version(2)); - EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion()); EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size()); EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 7958306db5f..2944348e639 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -175,6 +175,10 @@ struct DistributorTest : Test, DistributorTestUtil { return _distributor->handleMessage(msg); } + uint64_t db_sample_interval_sec() const noexcept { + return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count(); + } + void configure_stale_reads_enabled(bool enabled) { ConfigBuilder builder; builder.allowStaleReadsDuringClusterStateTransitions = enabled; @@ -280,19 +284,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) { "storage:1 .0.s:d distributor:1"); enableDistributorClusterState("storage:1 distributor:1"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); for (uint32_t i = 0; i < 3; ++i) { addNodesToBucketDB(document::BucketId(16, i), "0=1"); } for (int i = 0; i < 3; ++i) { tick(); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); } tick(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); enableDistributorClusterState("storage:2 distributor:1"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); } // TODO -> stripe test @@ -489,14 +493,6 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) } } -namespace { - -uint64_t db_sample_interval_sec(const Distributor& d) noexcept { - return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count(); -} - -} - // TODO -> stripe test TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { getClock().setAbsoluteTimeInSeconds(1000); @@ -517,7 +513,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim // interval has passed. Instead, old metric gauge values should be preserved. addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2"); - const auto sample_interval_sec = db_sample_interval_sec(getDistributor()); + const auto sample_interval_sec = db_sample_interval_sec(); getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet. tickDistributorNTimes(50); distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); @@ -925,7 +921,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes reply->setResult(api::ReturnCode(api::ReturnCode::BUSY)); _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply))); - auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo(); + auto& node_info = pending_message_tracker().getNodeInfo(); EXPECT_TRUE(node_info.isBusy(0)); getClock().addSecondsToTime(99); @@ -1045,7 +1041,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) { tickDistributorNTimes(5); // 1/3rds into second round through database enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); // Bucket space stats should now be invalid per space per node, pending stats // from state version 2. Exposing stats from version 1 risks reporting stale // information back to the cluster controller. @@ -1066,13 +1062,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(1); // DB round not yet complete EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); // Now out of recovery mode, subsequent round completions should not send replies tickDistributorNTimes(10); EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); @@ -1080,12 +1076,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) { setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); // 2 buckets with missing replicas triggering merge pending stats addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a"); tickDistributorNTimes(3); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); const auto space_name = FixedBucketSpaces::to_string(space); assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats()); // First completed scan sends off merge stats et al to cluster controller @@ -1214,7 +1210,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) { set_up_and_start_get_op_with_stale_reads_enabled(true); Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1)); - EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage( + EXPECT_FALSE(pending_message_tracker().hasPendingMessage( 0, bucket, api::MessageType::GET_ID)); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index bdd953b6206..5d204693971 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -425,6 +425,36 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const { return _distributor->getReadOnlyBucketSpaceRepo(); } +bool +DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept { + return _distributor->isInRecoveryMode(); +} + +const lib::ClusterStateBundle& +DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept { + return getDistributor().getClusterStateBundle(); +} + +std::string +DistributorTestUtil::active_ideal_state_operations() const { + return _distributor->getActiveIdealStateOperations(); +} + +const PendingMessageTracker& +DistributorTestUtil::pending_message_tracker() const noexcept { + return _distributor->getPendingMessageTracker(); +} + +PendingMessageTracker& +DistributorTestUtil::pending_message_tracker() noexcept { + return _distributor->getPendingMessageTracker(); +} + +std::chrono::steady_clock::duration +DistributorTestUtil::db_memory_sample_interval() const noexcept { + return _distributor->db_memory_sample_interval(); +} + const lib::Distribution& DistributorTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); @@ -453,4 +483,9 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); } +void +DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) { + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index b845456e873..0ed2498a0a2 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -118,9 +118,8 @@ public: storage::distributor::DistributorStripeComponent& distributor_component(); storage::distributor::DistributorStripeOperationContext& operation_context(); - Distributor& getDistributor() { - return *_distributor; - } + Distributor& getDistributor() noexcept { return *_distributor; } + const Distributor& getDistributor() const noexcept { return *_distributor; } bool tick(); @@ -140,6 +139,12 @@ public: const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; + [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept; + [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept; + [[nodiscard]] std::string active_ideal_state_operations() const; + [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; + [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; + [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket @@ -190,6 +195,8 @@ public: DistributorMessageSenderStub& sender() noexcept { return _sender; } const DistributorMessageSenderStub& sender() const noexcept { return _sender; } + + void setSystemState(const lib::ClusterState& systemState); protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index ce9aa0a6800..0a36e5cd0e5 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -38,10 +38,6 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { close(); } - void setSystemState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); - } - bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, const PendingMessageTracker& tracker, @@ -120,7 +116,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) { ost.str()); tick(); - EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); + EXPECT_EQ("", active_ideal_state_operations()); } @@ -143,13 +139,12 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) { EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d")); - EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); - EXPECT_EQ(0, _distributor->getPendingMessageTracker() - .getNodeInfo().getPendingCount(0)); + EXPECT_EQ("", active_ideal_state_operations()); + EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0)); } TEST_F(IdealStateManagerTest, recheck_when_active) { @@ -162,17 +157,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) { tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); } TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 02491b670c6..478f20796d0 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { createLinks(); setupDistributor(1, 1, "version:1 distributor:1 storage:1"); _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); - _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker()); + _sender.setPendingMessageTracker(pending_message_tracker()); addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); } @@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) { return std::make_shared<ReadForWriteVisitorOperationStarter>( std::move(visitor_op), operation_sequencer(), - *_op_owner, getDistributor().getPendingMessageTracker(), + *_op_owner, pending_message_tracker(), _mock_uuid_generator); } }; @@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend std::move(nodes), api::Timestamp(123456)); merge->setAddress(make_storage_address(0)); - getDistributor().getPendingMessageTracker().insert(merge); + pending_message_tracker().insert(merge); _op_owner->start(op, OperationStarter::Priority(120)); ASSERT_EQ("", _sender.getCommands(true)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " @@ -157,13 +157,13 @@ struct ConcurrentMutationFixture { _mutation = _test.sender().command(0); // Since pending message tracking normally happens in the distributor itself during sendUp, // we have to emulate this and explicitly insert the sent message into the pending mapping. - _test.getDistributor().getPendingMessageTracker().insert(_mutation); + _test.pending_message_tracker().insert(_mutation); } void unblock_bucket() { // Pretend update operation completed auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply()); - _test.getDistributor().getPendingMessageTracker().reply(*update_reply); + _test.pending_message_tracker().reply(*update_reply); _test._op_owner->handleReply(update_reply); } }; diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 9b49f1347cc..8310e4e38e0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { }; void enableClusterState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); + setSystemState(systemState); } void insertJoinableBuckets(); diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index ccbb64e8970..f9a4e6dbe0f 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) { TEST_F(VisitorOperationTest, visit_ideal_node) { ClusterState state("distributor:1 storage:3"); - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state)); + enable_distributor_cluster_state(lib::ClusterStateBundle(state)); // Create buckets in bucketdb for (int i=0; i<32; i++ ) { diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index bdf3f30fdc9..04d8560298a 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -54,8 +54,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _comp_reg(compReg), _metrics(std::make_shared<DistributorMetricSet>()), _messageSender(messageSender), + _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, - doneInitHandler, *this, (num_distributor_stripes == 0))), + doneInitHandler, *this, _use_legacy_mode)), _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)), _component(*this, compReg, "distributor"), _total_config(_component.total_distributor_config_sp()), @@ -71,7 +72,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, { _component.registerMetric(*_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); - if (num_distributor_stripes > 0) { + if (!_use_legacy_mode) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, @@ -90,43 +91,54 @@ Distributor::~Distributor() closeNextLink(); } +// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists. +// All functions below that assert on _use_legacy_mode are only currently used by tests + bool Distributor::isInRecoveryMode() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->isInRecoveryMode(); } const PendingMessageTracker& Distributor::getPendingMessageTracker() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getPendingMessageTracker(); } PendingMessageTracker& Distributor::getPendingMessageTracker() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getPendingMessageTracker(); } DistributorBucketSpaceRepo& Distributor::getBucketSpaceRepo() noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getBucketSpaceRepo(); } const DistributorBucketSpaceRepo& Distributor::getBucketSpaceRepo() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getBucketSpaceRepo(); } DistributorBucketSpaceRepo& Distributor::getReadOnlyBucketSpaceRepo() noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getReadOnlyBucketSpaceRepo(); } const DistributorBucketSpaceRepo& Distributor::getReadyOnlyBucketSpaceRepo() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getReadOnlyBucketSpaceRepo();; } storage::distributor::DistributorStripeComponent& Distributor::distributor_component() noexcept { + assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE We need to grab the stripe's component since tests like to access // these things uncomfortably directly. return _stripe->_component; @@ -134,46 +146,55 @@ Distributor::distributor_component() noexcept { StripeBucketDBUpdater& Distributor::bucket_db_updater() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->bucket_db_updater(); } const StripeBucketDBUpdater& Distributor::bucket_db_updater() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->bucket_db_updater(); } IdealStateManager& Distributor::ideal_state_manager() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->ideal_state_manager(); } const IdealStateManager& Distributor::ideal_state_manager() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->ideal_state_manager(); } ExternalOperationHandler& Distributor::external_operation_handler() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->external_operation_handler(); } const ExternalOperationHandler& Distributor::external_operation_handler() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->external_operation_handler(); } BucketDBMetricUpdater& Distributor::bucket_db_metric_updater() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->_bucketDBMetricUpdater; } const DistributorConfiguration& Distributor::getConfig() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getConfig(); } std::chrono::steady_clock::duration Distributor::db_memory_sample_interval() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->db_memory_sample_interval(); } @@ -254,7 +275,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread // regardless of what RPC thread (comm mgr, FRT...) this is called from! - if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) { + if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) { return msg->callHandler(*_bucket_db_updater, msg); } // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? @@ -265,7 +286,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) bool Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) { - if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) { + if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) { return reply->callHandler(*_bucket_db_updater, reply); } return _stripe->handleReply(reply); @@ -275,6 +296,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->handleMessage(msg); } @@ -299,6 +321,7 @@ Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply) const lib::ClusterStateBundle& Distributor::getClusterStateBundle() const { + assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE must offer a single unifying state across stripes return _stripe->getClusterStateBundle(); } @@ -306,6 +329,7 @@ Distributor::getClusterStateBundle() const void Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) { + assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE make test injection/force-function _stripe->enableClusterStateBundle(state); } @@ -313,7 +337,7 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) void Distributor::storageDistributionChanged() { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { if (!_distribution || (*_component.getDistribution() != *_distribution)) { LOG(debug, "Distribution changed to %s, must re-fetch bucket information", _component.getDistribution()->toString().c_str()); @@ -332,7 +356,7 @@ Distributor::storageDistributionChanged() void Distributor::enableNextDistribution() { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { if (_next_distribution) { _distribution = _next_distribution; _next_distribution = std::shared_ptr<lib::Distribution>(); @@ -350,7 +374,7 @@ void Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { - // TODO STRIPE top-level bucket DB updater + // TODO STRIPE cannot directly access stripe when not in legacy mode! _stripe->propagateDefaultDistribution(std::move(distribution)); } @@ -383,6 +407,7 @@ Distributor::propagateInternalScanMetricsToExternal() void Distributor::scanAllBuckets() { + assert(_use_legacy_mode); // TODO STRIPE _stripe->scanAllBuckets(); } @@ -390,11 +415,12 @@ framework::ThreadWaitInfo Distributor::doCriticalTick(framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (_bucket_db_updater) { + if (!_use_legacy_mode) { enableNextDistribution(); } // Propagates any new configs down to stripe(s) enableNextConfig(); + // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise _stripe->doCriticalTick(idx); _tickResult.merge(_stripe->_tickResult); return _tickResult; @@ -403,7 +429,7 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) framework::ThreadWaitInfo Distributor::doNonCriticalTick(framework::ThreadIndex idx) { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { _bucket_db_updater->resend_delayed_messages(); } // TODO STRIPE stripes need their own thread loops! @@ -417,7 +443,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c { // Only lazily trigger a config propagation and internal update if something has _actually changed_. if (_component.internal_config_generation() != _current_internal_config_generation) { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { _total_config = _component.total_distributor_config_sp(); auto guard = _stripe_accessor->rendezvous_and_hold_all(); guard->update_total_distributor_config(_component.total_distributor_config_sp()); @@ -427,7 +453,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _current_internal_config_generation = _component.internal_config_generation(); } - if (!_bucket_db_updater) { + if (_use_legacy_mode) { // TODO STRIPE remove these once tests are fixed to trigger reconfig properly _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 71d71290bb5..0420f1b1f22 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -83,12 +83,6 @@ public: int getDistributorIndex() const override { return _component.node_index(); } const ClusterContext& cluster_context() const override { return _component.cluster_context(); } - /** - * Enables a new cluster state. Called after the bucket db updater has - * retrieved all bucket info related to the change. - */ - void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); - void storageDistributionChanged() override; bool handleReply(const std::shared_ptr<api::StorageReply>& reply); @@ -99,26 +93,9 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; - std::string getActiveIdealStateOperations() const; - virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; - const lib::ClusterStateBundle& getClusterStateBundle() const; - const DistributorConfiguration& getConfig() const; - - bool isInRecoveryMode() const noexcept; - - PendingMessageTracker& getPendingMessageTracker(); - const PendingMessageTracker& getPendingMessageTracker() const; - - DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; - DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; - - storage::distributor::DistributorStripeComponent& distributor_component() noexcept; - class MetricUpdateHook : public framework::MetricUpdateHook { public: @@ -135,8 +112,6 @@ public: Distributor& _self; }; - std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; - private: friend struct DistributorTest; friend class BucketDBUpdaterTest; @@ -146,14 +121,31 @@ private: void setNodeStateUp(); bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); + /** + * Enables a new cluster state. Used by tests to bypass BucketDBUpdater. + */ + void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); + // Accessors used by tests + std::string getActiveIdealStateOperations() const; + const lib::ClusterStateBundle& getClusterStateBundle() const; + const DistributorConfiguration& getConfig() const; + bool isInRecoveryMode() const noexcept; + PendingMessageTracker& getPendingMessageTracker(); + const PendingMessageTracker& getPendingMessageTracker() const; + DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; + storage::distributor::DistributorStripeComponent& distributor_component() noexcept; + std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; + StripeBucketDBUpdater& bucket_db_updater(); const StripeBucketDBUpdater& bucket_db_updater() const; IdealStateManager& ideal_state_manager(); const IdealStateManager& ideal_state_manager() const; ExternalOperationHandler& external_operation_handler(); const ExternalOperationHandler& external_operation_handler() const; - BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept; /** @@ -177,6 +169,7 @@ private: DistributorComponentRegister& _comp_reg; std::shared_ptr<DistributorMetricSet> _metrics; ChainedMessageSender* _messageSender; + const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. std::unique_ptr<DistributorStripe> _stripe; std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor; |