diff options
Diffstat (limited to 'storage')
10 files changed, 138 insertions, 86 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 97fccf58901..7e8fec3b83a 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); _sender.clear(); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); complete_recovery_mode(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); std::string distConfig(getDistConfig6Nodes4Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); // No replies received yet, still no recovery mode. - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); ASSERT_EQ(messageCount(6), _sender.commands().size()); uint32_t numBuckets = 10; @@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) // Pending cluster state (i.e. distribution) has been enabled, which should // cause recovery mode to be entered. - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); complete_recovery_mode(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); } namespace { @@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until "version:2 distributor:1 storage:4", n_buckets, 4)); // Version should not be switched over yet - EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion()); EXPECT_EQ(uint64_t(0), mutable_default_db().size()); EXPECT_EQ(uint64_t(0), mutable_global_db().size()); EXPECT_FALSE(activate_cluster_state_version(2)); - EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion()); EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size()); EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 7958306db5f..2944348e639 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -175,6 +175,10 @@ struct DistributorTest : Test, DistributorTestUtil { return _distributor->handleMessage(msg); } + uint64_t db_sample_interval_sec() const noexcept { + return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count(); + } + void configure_stale_reads_enabled(bool enabled) { ConfigBuilder builder; builder.allowStaleReadsDuringClusterStateTransitions = enabled; @@ -280,19 +284,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) { "storage:1 .0.s:d distributor:1"); enableDistributorClusterState("storage:1 distributor:1"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); for (uint32_t i = 0; i < 3; ++i) { addNodesToBucketDB(document::BucketId(16, i), "0=1"); } for (int i = 0; i < 3; ++i) { tick(); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); } tick(); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); enableDistributorClusterState("storage:2 distributor:1"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); } // TODO -> stripe test @@ -489,14 +493,6 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics) } } -namespace { - -uint64_t db_sample_interval_sec(const Distributor& d) noexcept { - return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count(); -} - -} - // TODO -> stripe test TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) { getClock().setAbsoluteTimeInSeconds(1000); @@ -517,7 +513,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim // interval has passed. Instead, old metric gauge values should be preserved. addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2"); - const auto sample_interval_sec = db_sample_interval_sec(getDistributor()); + const auto sample_interval_sec = db_sample_interval_sec(); getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet. tickDistributorNTimes(50); distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); @@ -925,7 +921,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes reply->setResult(api::ReturnCode(api::ReturnCode::BUSY)); _distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply))); - auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo(); + auto& node_info = pending_message_tracker().getNodeInfo(); EXPECT_TRUE(node_info.isBusy(0)); getClock().addSecondsToTime(99); @@ -1045,7 +1041,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) { tickDistributorNTimes(5); // 1/3rds into second round through database enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); // Bucket space stats should now be invalid per space per node, pending stats // from state version 2. Exposing stats from version 1 risks reporting stale // information back to the cluster controller. @@ -1066,13 +1062,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a"); enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(1); // DB round not yet complete EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); // Now out of recovery mode, subsequent round completions should not send replies tickDistributorNTimes(10); EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); @@ -1080,12 +1076,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) { setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); - EXPECT_TRUE(_distributor->isInRecoveryMode()); + EXPECT_TRUE(distributor_is_in_recovery_mode()); // 2 buckets with missing replicas triggering merge pending stats addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a"); addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a"); tickDistributorNTimes(3); - EXPECT_FALSE(_distributor->isInRecoveryMode()); + EXPECT_FALSE(distributor_is_in_recovery_mode()); const auto space_name = FixedBucketSpaces::to_string(space); assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats()); // First completed scan sends off merge stats et al to cluster controller @@ -1214,7 +1210,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) { set_up_and_start_get_op_with_stale_reads_enabled(true); Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1)); - EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage( + EXPECT_FALSE(pending_message_tracker().hasPendingMessage( 0, bucket, api::MessageType::GET_ID)); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index bdd953b6206..5d204693971 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -425,6 +425,36 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const { return _distributor->getReadOnlyBucketSpaceRepo(); } +bool +DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept { + return _distributor->isInRecoveryMode(); +} + +const lib::ClusterStateBundle& +DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept { + return getDistributor().getClusterStateBundle(); +} + +std::string +DistributorTestUtil::active_ideal_state_operations() const { + return _distributor->getActiveIdealStateOperations(); +} + +const PendingMessageTracker& +DistributorTestUtil::pending_message_tracker() const noexcept { + return _distributor->getPendingMessageTracker(); +} + +PendingMessageTracker& +DistributorTestUtil::pending_message_tracker() noexcept { + return _distributor->getPendingMessageTracker(); +} + +std::chrono::steady_clock::duration +DistributorTestUtil::db_memory_sample_interval() const noexcept { + return _distributor->db_memory_sample_interval(); +} + const lib::Distribution& DistributorTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); @@ -453,4 +483,9 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun getBucketDBUpdater().simulate_cluster_state_bundle_activation(state); } +void +DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) { + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index b845456e873..0ed2498a0a2 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -118,9 +118,8 @@ public: storage::distributor::DistributorStripeComponent& distributor_component(); storage::distributor::DistributorStripeOperationContext& operation_context(); - Distributor& getDistributor() { - return *_distributor; - } + Distributor& getDistributor() noexcept { return *_distributor; } + const Distributor& getDistributor() const noexcept { return *_distributor; } bool tick(); @@ -140,6 +139,12 @@ public: const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; + [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept; + [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept; + [[nodiscard]] std::string active_ideal_state_operations() const; + [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; + [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; + [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket @@ -190,6 +195,8 @@ public: DistributorMessageSenderStub& sender() noexcept { return _sender; } const DistributorMessageSenderStub& sender() const noexcept { return _sender; } + + void setSystemState(const lib::ClusterState& systemState); protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index ce9aa0a6800..0a36e5cd0e5 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -38,10 +38,6 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { close(); } - void setSystemState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); - } - bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, const PendingMessageTracker& tracker, @@ -120,7 +116,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) { ost.str()); tick(); - EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); + EXPECT_EQ("", active_ideal_state_operations()); } @@ -143,13 +139,12 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) { EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n" "setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d")); - EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); - EXPECT_EQ(0, _distributor->getPendingMessageTracker() - .getNodeInfo().getPendingCount(0)); + EXPECT_EQ("", active_ideal_state_operations()); + EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0)); } TEST_F(IdealStateManagerTest, recheck_when_active) { @@ -162,17 +157,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) { tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", - _distributor->getActiveIdealStateOperations()); + active_ideal_state_operations()); } TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 02491b670c6..478f20796d0 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { createLinks(); setupDistributor(1, 1, "version:1 distributor:1 storage:1"); _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); - _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker()); + _sender.setPendingMessageTracker(pending_message_tracker()); addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); } @@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) { return std::make_shared<ReadForWriteVisitorOperationStarter>( std::move(visitor_op), operation_sequencer(), - *_op_owner, getDistributor().getPendingMessageTracker(), + *_op_owner, pending_message_tracker(), _mock_uuid_generator); } }; @@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend std::move(nodes), api::Timestamp(123456)); merge->setAddress(make_storage_address(0)); - getDistributor().getPendingMessageTracker().insert(merge); + pending_message_tracker().insert(merge); _op_owner->start(op, OperationStarter::Priority(120)); ASSERT_EQ("", _sender.getCommands(true)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " @@ -157,13 +157,13 @@ struct ConcurrentMutationFixture { _mutation = _test.sender().command(0); // Since pending message tracking normally happens in the distributor itself during sendUp, // we have to emulate this and explicitly insert the sent message into the pending mapping. - _test.getDistributor().getPendingMessageTracker().insert(_mutation); + _test.pending_message_tracker().insert(_mutation); } void unblock_bucket() { // Pretend update operation completed auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply()); - _test.getDistributor().getPendingMessageTracker().reply(*update_reply); + _test.pending_message_tracker().reply(*update_reply); _test._op_owner->handleReply(update_reply); } }; diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 9b49f1347cc..8310e4e38e0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { }; void enableClusterState(const lib::ClusterState& systemState) { - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); + setSystemState(systemState); } void insertJoinableBuckets(); diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index ccbb64e8970..f9a4e6dbe0f 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) { TEST_F(VisitorOperationTest, visit_ideal_node) { ClusterState state("distributor:1 storage:3"); - _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state)); + enable_distributor_cluster_state(lib::ClusterStateBundle(state)); // Create buckets in bucketdb for (int i=0; i<32; i++ ) { diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index bdf3f30fdc9..04d8560298a 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -54,8 +54,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _comp_reg(compReg), _metrics(std::make_shared<DistributorMetricSet>()), _messageSender(messageSender), + _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, - doneInitHandler, *this, (num_distributor_stripes == 0))), + doneInitHandler, *this, _use_legacy_mode)), _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)), _component(*this, compReg, "distributor"), _total_config(_component.total_distributor_config_sp()), @@ -71,7 +72,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, { _component.registerMetric(*_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); - if (num_distributor_stripes > 0) { + if (!_use_legacy_mode) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, @@ -90,43 +91,54 @@ Distributor::~Distributor() closeNextLink(); } +// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists. +// All functions below that assert on _use_legacy_mode are only currently used by tests + bool Distributor::isInRecoveryMode() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->isInRecoveryMode(); } const PendingMessageTracker& Distributor::getPendingMessageTracker() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getPendingMessageTracker(); } PendingMessageTracker& Distributor::getPendingMessageTracker() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getPendingMessageTracker(); } DistributorBucketSpaceRepo& Distributor::getBucketSpaceRepo() noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getBucketSpaceRepo(); } const DistributorBucketSpaceRepo& Distributor::getBucketSpaceRepo() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getBucketSpaceRepo(); } DistributorBucketSpaceRepo& Distributor::getReadOnlyBucketSpaceRepo() noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getReadOnlyBucketSpaceRepo(); } const DistributorBucketSpaceRepo& Distributor::getReadyOnlyBucketSpaceRepo() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getReadOnlyBucketSpaceRepo();; } storage::distributor::DistributorStripeComponent& Distributor::distributor_component() noexcept { + assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE We need to grab the stripe's component since tests like to access // these things uncomfortably directly. return _stripe->_component; @@ -134,46 +146,55 @@ Distributor::distributor_component() noexcept { StripeBucketDBUpdater& Distributor::bucket_db_updater() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->bucket_db_updater(); } const StripeBucketDBUpdater& Distributor::bucket_db_updater() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->bucket_db_updater(); } IdealStateManager& Distributor::ideal_state_manager() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->ideal_state_manager(); } const IdealStateManager& Distributor::ideal_state_manager() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->ideal_state_manager(); } ExternalOperationHandler& Distributor::external_operation_handler() { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->external_operation_handler(); } const ExternalOperationHandler& Distributor::external_operation_handler() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->external_operation_handler(); } BucketDBMetricUpdater& Distributor::bucket_db_metric_updater() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->_bucketDBMetricUpdater; } const DistributorConfiguration& Distributor::getConfig() const { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->getConfig(); } std::chrono::steady_clock::duration Distributor::db_memory_sample_interval() const noexcept { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->db_memory_sample_interval(); } @@ -254,7 +275,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread // regardless of what RPC thread (comm mgr, FRT...) this is called from! - if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) { + if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) { return msg->callHandler(*_bucket_db_updater, msg); } // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? @@ -265,7 +286,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) bool Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) { - if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) { + if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) { return reply->callHandler(*_bucket_db_updater, reply); } return _stripe->handleReply(reply); @@ -275,6 +296,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { + assert(_use_legacy_mode); // TODO STRIPE return _stripe->handleMessage(msg); } @@ -299,6 +321,7 @@ Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply) const lib::ClusterStateBundle& Distributor::getClusterStateBundle() const { + assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE must offer a single unifying state across stripes return _stripe->getClusterStateBundle(); } @@ -306,6 +329,7 @@ Distributor::getClusterStateBundle() const void Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) { + assert(_use_legacy_mode); // TODO STRIPE // TODO STRIPE make test injection/force-function _stripe->enableClusterStateBundle(state); } @@ -313,7 +337,7 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) void Distributor::storageDistributionChanged() { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { if (!_distribution || (*_component.getDistribution() != *_distribution)) { LOG(debug, "Distribution changed to %s, must re-fetch bucket information", _component.getDistribution()->toString().c_str()); @@ -332,7 +356,7 @@ Distributor::storageDistributionChanged() void Distributor::enableNextDistribution() { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { if (_next_distribution) { _distribution = _next_distribution; _next_distribution = std::shared_ptr<lib::Distribution>(); @@ -350,7 +374,7 @@ void Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { - // TODO STRIPE top-level bucket DB updater + // TODO STRIPE cannot directly access stripe when not in legacy mode! _stripe->propagateDefaultDistribution(std::move(distribution)); } @@ -383,6 +407,7 @@ Distributor::propagateInternalScanMetricsToExternal() void Distributor::scanAllBuckets() { + assert(_use_legacy_mode); // TODO STRIPE _stripe->scanAllBuckets(); } @@ -390,11 +415,12 @@ framework::ThreadWaitInfo Distributor::doCriticalTick(framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (_bucket_db_updater) { + if (!_use_legacy_mode) { enableNextDistribution(); } // Propagates any new configs down to stripe(s) enableNextConfig(); + // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise _stripe->doCriticalTick(idx); _tickResult.merge(_stripe->_tickResult); return _tickResult; @@ -403,7 +429,7 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) framework::ThreadWaitInfo Distributor::doNonCriticalTick(framework::ThreadIndex idx) { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { _bucket_db_updater->resend_delayed_messages(); } // TODO STRIPE stripes need their own thread loops! @@ -417,7 +443,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c { // Only lazily trigger a config propagation and internal update if something has _actually changed_. if (_component.internal_config_generation() != _current_internal_config_generation) { - if (_bucket_db_updater) { + if (!_use_legacy_mode) { _total_config = _component.total_distributor_config_sp(); auto guard = _stripe_accessor->rendezvous_and_hold_all(); guard->update_total_distributor_config(_component.total_distributor_config_sp()); @@ -427,7 +453,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _current_internal_config_generation = _component.internal_config_generation(); } - if (!_bucket_db_updater) { + if (_use_legacy_mode) { // TODO STRIPE remove these once tests are fixed to trigger reconfig properly _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 71d71290bb5..0420f1b1f22 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -83,12 +83,6 @@ public: int getDistributorIndex() const override { return _component.node_index(); } const ClusterContext& cluster_context() const override { return _component.cluster_context(); } - /** - * Enables a new cluster state. Called after the bucket db updater has - * retrieved all bucket info related to the change. - */ - void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); - void storageDistributionChanged() override; bool handleReply(const std::shared_ptr<api::StorageReply>& reply); @@ -99,26 +93,9 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; - std::string getActiveIdealStateOperations() const; - virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; - const lib::ClusterStateBundle& getClusterStateBundle() const; - const DistributorConfiguration& getConfig() const; - - bool isInRecoveryMode() const noexcept; - - PendingMessageTracker& getPendingMessageTracker(); - const PendingMessageTracker& getPendingMessageTracker() const; - - DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; - DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; - const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; - - storage::distributor::DistributorStripeComponent& distributor_component() noexcept; - class MetricUpdateHook : public framework::MetricUpdateHook { public: @@ -135,8 +112,6 @@ public: Distributor& _self; }; - std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; - private: friend struct DistributorTest; friend class BucketDBUpdaterTest; @@ -146,14 +121,31 @@ private: void setNodeStateUp(); bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); + /** + * Enables a new cluster state. Used by tests to bypass BucketDBUpdater. + */ + void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); + // Accessors used by tests + std::string getActiveIdealStateOperations() const; + const lib::ClusterStateBundle& getClusterStateBundle() const; + const DistributorConfiguration& getConfig() const; + bool isInRecoveryMode() const noexcept; + PendingMessageTracker& getPendingMessageTracker(); + const PendingMessageTracker& getPendingMessageTracker() const; + DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; + storage::distributor::DistributorStripeComponent& distributor_component() noexcept; + std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; + StripeBucketDBUpdater& bucket_db_updater(); const StripeBucketDBUpdater& bucket_db_updater() const; IdealStateManager& ideal_state_manager(); const IdealStateManager& ideal_state_manager() const; ExternalOperationHandler& external_operation_handler(); const ExternalOperationHandler& external_operation_handler() const; - BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept; /** @@ -177,6 +169,7 @@ private: DistributorComponentRegister& _comp_reg; std::shared_ptr<DistributorMetricSet> _metrics; ChainedMessageSender* _messageSender; + const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. std::unique_ptr<DistributorStripe> _stripe; std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor; |