diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-14 13:01:46 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-14 14:48:56 +0000 |
commit | d35dbfdcdc7288079a97fb93e609c370a1aba89a (patch) | |
tree | 7f927475ac9d759f0a27103e853e5a9b0b2d94ba /storage | |
parent | 92ad19102e936032fe61fa5a93bc88a2fc246a22 (diff) |
Move initializing handling to top-level distributor
Add a listener interface that lets the top-level distributor intercept
cluster state activations and use this for triggering the node init edge.
This happens when all stripes are paused so this is safe from data races.
Legacy code in the DistributorStripe remains for now.
Diffstat (limited to 'storage')
11 files changed, 98 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index 9e87aa08cfa..043f996a4a1 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -25,6 +25,7 @@ DistributorStripeTestUtil::DistributorStripeTestUtil() _sender(), _senderDown(), _hostInfo(), + _done_initializing(true), _messageSender(_sender, _senderDown) { _config = getStandardConfig(false); @@ -47,7 +48,8 @@ DistributorStripeTestUtil::createLinks() *this, _messageSender, *this, - false); + false, + _done_initializing); } void diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index ae4b4c2913a..ccade98fd01 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -219,6 +219,7 @@ protected: DistributorMessageSenderStub _sender; DistributorMessageSenderStub _senderDown; HostInfo _hostInfo; + bool _done_initializing; struct MessageSenderImpl : public ChainedMessageSender { DistributorMessageSenderStub& _sender; diff --git a/storage/src/tests/distributor/legacy_distributor_test.cpp b/storage/src/tests/distributor/legacy_distributor_test.cpp index 11dc79769a7..0a472430e78 100644 --- a/storage/src/tests/distributor/legacy_distributor_test.cpp +++ b/storage/src/tests/distributor/legacy_distributor_test.cpp @@ -301,6 +301,15 @@ TEST_F(LegacyDistributorTest, recovery_mode_on_cluster_state_change) { EXPECT_TRUE(distributor_is_in_recovery_mode()); } +TEST_F(LegacyDistributorTest, distributor_considered_initialized_once_self_observed_up) { + setupDistributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D: + EXPECT_FALSE(_distributor->done_initializing()); + enableDistributorClusterState("distributor:1 storage:1"); // We're up :D + EXPECT_TRUE(_distributor->done_initializing()); + enableDistributorClusterState("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state + EXPECT_TRUE(_distributor->done_initializing()); +} + // Migrated to DistributorStripeTest TEST_F(LegacyDistributorTest, operations_are_throttled) { setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 8fae1c6d738..eb2f8217872 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -162,6 +162,15 @@ TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_trigger EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); } +TEST_F(TopLevelDistributorTest, distributor_considered_initialized_once_self_observed_up) { + setup_distributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D: + EXPECT_FALSE(_distributor->done_initializing()); + enable_distributor_cluster_state("distributor:1 storage:1"); // We're up :D + EXPECT_TRUE(_distributor->done_initializing()); + enable_distributor_cluster_state("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state + EXPECT_TRUE(_distributor->done_initializing()); +} + // TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe TEST_F(TopLevelDistributorTest, contains_time_statement) { setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); diff --git a/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h new file mode 100644 index 00000000000..c3a885367ca --- /dev/null +++ b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +namespace storage::lib { class ClusterStateBundle; } + +namespace storage::distributor { + +/** + * Listener where on_cluster_state_bundle_activated() is invoked by the top-level + * bucket DB updater component upon a cluster state activation edge. + * + * Thread/concurrency note: this listener is always invoked from the top-level + * distributor thread and in a context where all stripe threads are paused. + * This means the callee must not directly or indirectly try to pause stripe + * threads itself, but it may safely modify shared state since no stripe threads + * are active. + */ +class ClusterStateBundleActivationListener { +public: + virtual ~ClusterStateBundleActivationListener() = default; + virtual void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) = 0; +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 1a9cb9f303c..d9381c4abf0 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -43,6 +43,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, bool use_legacy_mode, + bool& done_initializing_ref, uint32_t stripe_index) : DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), @@ -68,7 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _external_message_mutex(), _threadPool(threadPool), _doneInitializeHandler(doneInitHandler), - _doneInitializing(false), + _done_initializing_ref(done_initializing_ref), _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), @@ -299,8 +300,8 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state propagateClusterStates(); const auto& baseline_state = *state.getBaselineClusterState(); - if (!_doneInitializing && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) { - _doneInitializing = true; + if (_use_legacy_mode && !_done_initializing_ref && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) { + _done_initializing_ref = true; // TODO STRIPE remove; responsibility moved to TopLevelDistributor in non-legacy _doneInitializeHandler.notifyDoneInitializing(); } enterRecoveryMode(); @@ -365,9 +366,8 @@ DistributorStripe::leaveRecoveryMode() if (isInRecoveryMode()) { LOG(debug, "Leaving recovery mode"); // FIXME don't use shared metric for this - _metrics.recoveryModeTime.addValue( - _recoveryTimeStarted.getElapsedTimeAsDouble()); - if (_doneInitializing) { + _metrics.recoveryModeTime.addValue(_recoveryTimeStarted.getElapsedTimeAsDouble()); + if (_done_initializing_ref) { _must_send_updated_host_info = true; } } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index b1b20cf445a..6a3f39d7576 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -66,6 +66,7 @@ public: ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, bool use_legacy_mode, + bool& done_initializing_ref, // TODO STRIPE const ref once legacy is gone and stripe can't mutate init state uint32_t stripe_index = 0); ~DistributorStripe() override; @@ -147,7 +148,7 @@ public: void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override; bool initializing() const override { - return !_doneInitializing; + return !_done_initializing_ref; } const DistributorConfiguration& getConfig() const override { @@ -342,8 +343,8 @@ private: mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo; mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests; - DoneInitializeHandler& _doneInitializeHandler; - bool _doneInitializing; + DoneInitializeHandler& _doneInitializeHandler; // TODO STRIPE remove when legacy is gone + bool& _done_initializing_ref; std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb; std::unique_ptr<SimpleMaintenanceScanner> _scanner; diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 8c258cda5e4..087ad01d65c 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -32,9 +32,11 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n DistributorInterface& distributor_interface, ChainedMessageSender& chained_sender, std::shared_ptr<const lib::Distribution> bootstrap_distribution, - StripeAccessor& stripe_accessor) + StripeAccessor& stripe_accessor, + ClusterStateBundleActivationListener* state_activation_listener) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _stripe_accessor(stripe_accessor), + _state_activation_listener(state_activation_listener), _active_state_bundle(lib::ClusterState()), _node_ctx(node_ctx), _op_ctx(op_ctx), @@ -61,6 +63,9 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() { iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first)); } } + if (_state_activation_listener) { + _state_activation_listener->on_cluster_state_bundle_activated(_active_state_bundle); + } } void @@ -419,12 +424,12 @@ TopLevelBucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_ const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle(); _active_state_bundle = _pending_cluster_state->getNewClusterStateBundle(); + + guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer()); propagate_active_state_bundle_internally(); LOG(debug, "TopLevelBucketDBUpdater finished processing state %s", state.getBaselineClusterState()->toString().c_str()); - - guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer()); } void TopLevelBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state, diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index c9e28eee72d..2eccb70fdf9 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -28,6 +28,7 @@ namespace storage::distributor { struct BucketSpaceDistributionConfigs; class BucketSpaceDistributionContext; +class ClusterStateBundleActivationListener; class DistributorInterface; class StripeAccessor; class StripeAccessGuard; @@ -42,7 +43,8 @@ public: DistributorInterface& distributor_interface, ChainedMessageSender& chained_sender, std::shared_ptr<const lib::Distribution> bootstrap_distribution, - StripeAccessor& stripe_accessor); + StripeAccessor& stripe_accessor, + ClusterStateBundleActivationListener* state_activation_listener); ~TopLevelBucketDBUpdater() override; void flush(); @@ -109,6 +111,7 @@ private: // TODO STRIPE remove once distributor component dependencies have been pruned StripeAccessor& _stripe_accessor; + ClusterStateBundleActivationListener* _state_activation_listener; lib::ClusterStateBundle _active_state_bundle; const DistributorNodeContext& _node_ctx; diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index bbef20b2a23..d99a02081d8 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -52,7 +52,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, DistributorStripePool& stripe_pool, - DoneInitializeHandler& doneInitHandler, + DoneInitializeHandler& done_init_handler, uint32_t num_distributor_stripes, HostInfo& hostInfoReporterRegistrar, ChainedMessageSender* messageSender) @@ -60,7 +60,9 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, framework::StatusReporter("distributor", "Distributor"), _node_identity(node_identity), _comp_reg(compReg), + _done_init_handler(done_init_handler), _use_legacy_mode(num_distributor_stripes == 0), + _done_initializing(false), _metrics(std::make_shared<DistributorMetricSet>()), _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), @@ -75,7 +77,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, (_use_legacy_mode ? *_ideal_state_metrics : _ideal_state_total_metrics->stripe(0)), node_identity, threadPool, - doneInitHandler, *this, *this, _use_legacy_mode)), + _done_init_handler, *this, *this, _use_legacy_mode, _done_initializing)), _stripe_pool(stripe_pool), _stripes(), _stripe_accessor(), @@ -116,14 +118,16 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, _bucket_db_updater = std::make_unique<TopLevelBucketDBUpdater>(_component, _component, *this, *this, _component.getDistribution(), - *_stripe_accessor); + *_stripe_accessor, + this); _stripes.emplace_back(std::move(_stripe)); for (size_t i = 1; i < num_distributor_stripes; ++i) { _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, _total_metrics->stripe(i), _ideal_state_total_metrics->stripe(i), node_identity, threadPool, - doneInitHandler, *this, *this, _use_legacy_mode, i)); + _done_init_handler, *this, *this, _use_legacy_mode, + _done_initializing, i)); } _stripe_scan_stats.resize(num_distributor_stripes); _distributorStatusDelegate.registerStatusPage(); @@ -682,10 +686,10 @@ TopLevelDistributor::enableNextConfig() // TODO STRIPE rename to enable_next_con } } - void TopLevelDistributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index) { + // TODO STRIPE assert(_done_initializing); (can't currently do due to some unit test restrictions; uncomment and find out) LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index); std::lock_guard lock(_stripe_scan_notify_mutex); assert(!_use_legacy_mode); @@ -733,6 +737,18 @@ TopLevelDistributor::send_host_info_if_appropriate() } void +TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBundle& new_bundle) +{ + assert(!_use_legacy_mode); + lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex()); + if (!_done_initializing && (new_bundle.getBaselineClusterState()->getNodeState(my_node).getState() == lib::State::UP)) { + _done_initializing = true; + _done_init_handler.notifyDoneInitializing(); + } + LOG(debug, "Activated new state version in distributor: %s", new_bundle.toString().c_str()); +} + +void TopLevelDistributor::fetch_status_requests() { if (_fetched_status_requests.empty()) { diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h index 5de4b9c1aaa..ece9a61dfa3 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.h +++ b/storage/src/vespa/storage/distributor/top_level_distributor.h @@ -3,6 +3,7 @@ #pragma once #include "bucket_spaces_stats_provider.h" +#include "cluster_state_bundle_activation_listener.h" #include "top_level_bucket_db_updater.h" #include "distributor_component.h" #include "distributor_host_info_reporter.h" @@ -60,7 +61,8 @@ class TopLevelDistributor final public framework::TickingThread, public MinReplicaProvider, public BucketSpacesStatsProvider, - public StripeHostInfoNotifier + public StripeHostInfoNotifier, + public ClusterStateBundleActivationListener { public: TopLevelDistributor(DistributorComponentRegister&, @@ -86,6 +88,8 @@ public: const NodeIdentity& node_identity() const noexcept { return _node_identity; } + [[nodiscard]] bool done_initializing() const noexcept { return _done_initializing; } + // Implements DistributorInterface and DistributorMessageSender. DistributorMetricSet& metrics() override { return getMetrics(); } const DistributorConfiguration& config() const override; @@ -200,6 +204,9 @@ private: uint32_t random_stripe_idx(); uint32_t stripe_of_bucket_id(const document::BucketId& bucket_id, const api::StorageMessage& msg); + // ClusterStateBundleActivationListener impl: + void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) override; + struct StripeScanStats { bool wants_to_send_host_info = false; bool has_reported_in_at_least_once = false; @@ -209,7 +216,9 @@ private: const NodeIdentity _node_identity; DistributorComponentRegister& _comp_reg; + DoneInitializeHandler& _done_init_handler; const bool _use_legacy_mode; + bool _done_initializing; std::shared_ptr<DistributorMetricSet> _metrics; std::shared_ptr<DistributorTotalMetrics> _total_metrics; std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics; |