diff options
11 files changed, 98 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index 9e87aa08cfa..043f996a4a1 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -25,6 +25,7 @@ DistributorStripeTestUtil::DistributorStripeTestUtil() _sender(), _senderDown(), _hostInfo(), + _done_initializing(true), _messageSender(_sender, _senderDown) { _config = getStandardConfig(false); @@ -47,7 +48,8 @@ DistributorStripeTestUtil::createLinks() *this, _messageSender, *this, - false); + false, + _done_initializing); } void diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index ae4b4c2913a..ccade98fd01 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -219,6 +219,7 @@ protected: DistributorMessageSenderStub _sender; DistributorMessageSenderStub _senderDown; HostInfo _hostInfo; + bool _done_initializing; struct MessageSenderImpl : public ChainedMessageSender { DistributorMessageSenderStub& _sender; diff --git a/storage/src/tests/distributor/legacy_distributor_test.cpp b/storage/src/tests/distributor/legacy_distributor_test.cpp index 11dc79769a7..0a472430e78 100644 --- a/storage/src/tests/distributor/legacy_distributor_test.cpp +++ b/storage/src/tests/distributor/legacy_distributor_test.cpp @@ -301,6 +301,15 @@ TEST_F(LegacyDistributorTest, recovery_mode_on_cluster_state_change) { EXPECT_TRUE(distributor_is_in_recovery_mode()); } +TEST_F(LegacyDistributorTest, distributor_considered_initialized_once_self_observed_up) { + setupDistributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D: + EXPECT_FALSE(_distributor->done_initializing()); + enableDistributorClusterState("distributor:1 storage:1"); // We're up :D + EXPECT_TRUE(_distributor->done_initializing()); + enableDistributorClusterState("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state + EXPECT_TRUE(_distributor->done_initializing()); +} + // Migrated to DistributorStripeTest TEST_F(LegacyDistributorTest, operations_are_throttled) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 8fae1c6d738..eb2f8217872 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -162,6 +162,15 @@ TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_trigger EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); } +TEST_F(TopLevelDistributorTest, distributor_considered_initialized_once_self_observed_up) { + setup_distributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D: + EXPECT_FALSE(_distributor->done_initializing()); + enable_distributor_cluster_state("distributor:1 storage:1"); // We're up :D + EXPECT_TRUE(_distributor->done_initializing()); + enable_distributor_cluster_state("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state + EXPECT_TRUE(_distributor->done_initializing()); +} + // TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe TEST_F(TopLevelDistributorTest, contains_time_statement) { setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); diff --git a/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h new file mode 100644 index 00000000000..c3a885367ca --- /dev/null +++ b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +namespace storage::lib { class ClusterStateBundle; } + +namespace storage::distributor { + +/** + * Listener where on_cluster_state_bundle_activated() is invoked by the top-level + * bucket DB updater component upon a cluster state activation edge. + * + * Thread/concurrency note: this listener is always invoked from the top-level + * distributor thread and in a context where all stripe threads are paused. + * This means the callee must not directly or indirectly try to pause stripe + * threads itself, but it may safely modify shared state since no stripe threads + * are active. + */ +class ClusterStateBundleActivationListener { +public: + virtual ~ClusterStateBundleActivationListener() = default; + virtual void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) = 0; +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 1a9cb9f303c..d9381c4abf0 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -43,6 +43,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, bool use_legacy_mode, + bool& done_initializing_ref, uint32_t stripe_index) : DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), @@ -68,7 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _external_message_mutex(), _threadPool(threadPool), _doneInitializeHandler(doneInitHandler), - _doneInitializing(false), + _done_initializing_ref(done_initializing_ref), _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), @@ -299,8 +300,8 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state propagateClusterStates(); const auto& baseline_state = *state.getBaselineClusterState(); - if (!_doneInitializing && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) { - _doneInitializing = true; + if (_use_legacy_mode && !_done_initializing_ref && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) { + _done_initializing_ref = true; // TODO STRIPE remove; responsibility moved to TopLevelDistributor in non-legacy _doneInitializeHandler.notifyDoneInitializing(); } enterRecoveryMode(); @@ -365,9 +366,8 @@ DistributorStripe::leaveRecoveryMode() if (isInRecoveryMode()) { LOG(debug, "Leaving recovery mode"); // FIXME don't use shared metric for this - _metrics.recoveryModeTime.addValue( - _recoveryTimeStarted.getElapsedTimeAsDouble()); - if (_doneInitializing) { + _metrics.recoveryModeTime.addValue(_recoveryTimeStarted.getElapsedTimeAsDouble()); + if (_done_initializing_ref) { _must_send_updated_host_info = true; } } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index b1b20cf445a..6a3f39d7576 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -66,6 +66,7 @@ public: ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, bool use_legacy_mode, + bool& done_initializing_ref, // TODO STRIPE const ref once legacy is gone and stripe can't mutate init state uint32_t stripe_index = 0); ~DistributorStripe() override; @@ -147,7 +148,7 @@ public: void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override; bool initializing() const override { - return !_doneInitializing; + return !_done_initializing_ref; } const DistributorConfiguration& getConfig() const override { @@ -342,8 +343,8 @@ private: mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo; mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests; - DoneInitializeHandler& _doneInitializeHandler; - bool _doneInitializing; + DoneInitializeHandler& _doneInitializeHandler; // TODO STRIPE remove when legacy is gone + bool& _done_initializing_ref; std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb; std::unique_ptr<SimpleMaintenanceScanner> _scanner; diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 8c258cda5e4..087ad01d65c 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -32,9 +32,11 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n DistributorInterface& distributor_interface, ChainedMessageSender& chained_sender, std::shared_ptr<const lib::Distribution> bootstrap_distribution, - StripeAccessor& stripe_accessor) + StripeAccessor& stripe_accessor, + ClusterStateBundleActivationListener* state_activation_listener) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _stripe_accessor(stripe_accessor), + _state_activation_listener(state_activation_listener), _active_state_bundle(lib::ClusterState()), _node_ctx(node_ctx), _op_ctx(op_ctx), @@ -61,6 +63,9 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() { iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first)); } } + if (_state_activation_listener) { + _state_activation_listener->on_cluster_state_bundle_activated(_active_state_bundle); + } } void @@ -419,12 +424,12 @@ TopLevelBucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_ const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle(); _active_state_bundle = _pending_cluster_state->getNewClusterStateBundle(); + + guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer()); propagate_active_state_bundle_internally(); LOG(debug, "TopLevelBucketDBUpdater finished processing state %s", state.getBaselineClusterState()->toString().c_str()); - - guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer()); } void TopLevelBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state, diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index c9e28eee72d..2eccb70fdf9 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -28,6 +28,7 @@ namespace storage::distributor { struct BucketSpaceDistributionConfigs; class BucketSpaceDistributionContext; +class ClusterStateBundleActivationListener; class DistributorInterface; class StripeAccessor; class StripeAccessGuard; @@ -42,7 +43,8 @@ public: DistributorInterface& distributor_interface, ChainedMessageSender& chained_sender, std::shared_ptr<const lib::Distribution> bootstrap_distribution, - StripeAccessor& stripe_accessor); + StripeAccessor& stripe_accessor, + ClusterStateBundleActivationListener* state_activation_listener); ~TopLevelBucketDBUpdater() override; void flush(); @@ -109,6 +111,7 @@ private: // TODO STRIPE remove once distributor component dependencies have been pruned StripeAccessor& _stripe_accessor; + ClusterStateBundleActivationListener* _state_activation_listener; lib::ClusterStateBundle _active_state_bundle; const DistributorNodeContext& _node_ctx; diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index bbef20b2a23..d99a02081d8 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -52,7 +52,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, DistributorStripePool& stripe_pool, - DoneInitializeHandler& doneInitHandler, + DoneInitializeHandler& done_init_handler, uint32_t num_distributor_stripes, HostInfo& hostInfoReporterRegistrar, ChainedMessageSender* messageSender) @@ -60,7 +60,9 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, framework::StatusReporter("distributor", "Distributor"), _node_identity(node_identity), _comp_reg(compReg), + _done_init_handler(done_init_handler), _use_legacy_mode(num_distributor_stripes == 0), + _done_initializing(false), _metrics(std::make_shared<DistributorMetricSet>()), _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), @@ -75,7 +77,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, (_use_legacy_mode ? *_ideal_state_metrics : _ideal_state_total_metrics->stripe(0)), node_identity, threadPool, - doneInitHandler, *this, *this, _use_legacy_mode)), + _done_init_handler, *this, *this, _use_legacy_mode, _done_initializing)), _stripe_pool(stripe_pool), _stripes(), _stripe_accessor(), @@ -116,14 +118,16 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, _bucket_db_updater = std::make_unique<TopLevelBucketDBUpdater>(_component, _component, *this, *this, _component.getDistribution(), - *_stripe_accessor); + *_stripe_accessor, + this); _stripes.emplace_back(std::move(_stripe)); for (size_t i = 1; i < num_distributor_stripes; ++i) { _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, _total_metrics->stripe(i), _ideal_state_total_metrics->stripe(i), node_identity, threadPool, - doneInitHandler, *this, *this, _use_legacy_mode, i)); + _done_init_handler, *this, *this, _use_legacy_mode, + _done_initializing, i)); } _stripe_scan_stats.resize(num_distributor_stripes); _distributorStatusDelegate.registerStatusPage(); @@ -682,10 +686,10 @@ TopLevelDistributor::enableNextConfig() // TODO STRIPE rename to enable_next_con } } - void TopLevelDistributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index) { + // TODO STRIPE assert(_done_initializing); (can't currently do due to some unit test restrictions; uncomment and find out) LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index); std::lock_guard lock(_stripe_scan_notify_mutex); assert(!_use_legacy_mode); @@ -733,6 +737,18 @@ TopLevelDistributor::send_host_info_if_appropriate() } void +TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBundle& new_bundle) +{ + assert(!_use_legacy_mode); + lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex()); + if (!_done_initializing && (new_bundle.getBaselineClusterState()->getNodeState(my_node).getState() == lib::State::UP)) { + _done_initializing = true; + _done_init_handler.notifyDoneInitializing(); + } + LOG(debug, "Activated new state version in distributor: %s", new_bundle.toString().c_str()); +} + +void TopLevelDistributor::fetch_status_requests() { if (_fetched_status_requests.empty()) { diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h index 5de4b9c1aaa..ece9a61dfa3 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.h +++ b/storage/src/vespa/storage/distributor/top_level_distributor.h @@ -3,6 +3,7 @@ #pragma once #include "bucket_spaces_stats_provider.h" +#include "cluster_state_bundle_activation_listener.h" #include "top_level_bucket_db_updater.h" #include "distributor_component.h" #include "distributor_host_info_reporter.h" @@ -60,7 +61,8 @@ class TopLevelDistributor final public framework::TickingThread, public MinReplicaProvider, public BucketSpacesStatsProvider, - public StripeHostInfoNotifier + public StripeHostInfoNotifier, + public ClusterStateBundleActivationListener { public: TopLevelDistributor(DistributorComponentRegister&, @@ -86,6 +88,8 @@ public: const NodeIdentity& node_identity() const noexcept { return _node_identity; } + [[nodiscard]] bool done_initializing() const noexcept { return _done_initializing; } + // Implements DistributorInterface and DistributorMessageSender. DistributorMetricSet& metrics() override { return getMetrics(); } const DistributorConfiguration& config() const override; @@ -200,6 +204,9 @@ private: uint32_t random_stripe_idx(); uint32_t stripe_of_bucket_id(const document::BucketId& bucket_id, const api::StorageMessage& msg); + // ClusterStateBundleActivationListener impl: + void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) override; + struct StripeScanStats { bool wants_to_send_host_info = false; bool has_reported_in_at_least_once = false; @@ -209,7 +216,9 @@ private: const NodeIdentity _node_identity; DistributorComponentRegister& _comp_reg; + DoneInitializeHandler& _done_init_handler; const bool _use_legacy_mode; + bool _done_initializing; std::shared_ptr<DistributorMetricSet> _metrics; std::shared_ptr<DistributorTotalMetrics> _total_metrics; std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; |