diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-04-29 12:29:45 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-04-29 12:29:45 +0000 |
commit | d1eaf4cd911c334dc2d2aacbd73dfa8e23dca53b (patch) | |
tree | f587ad30cfe6d94b75cb099c4648bba891083eae /storage/src | |
parent | 4b98768ff0b2772631e9bcce55568f3171b2015d (diff) |
Thread-safe distribution change propagation in top level distributor component
Diffstat (limited to 'storage/src')
3 files changed, 13 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 5dcc7f5c6ad..9677ea568e8 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -81,7 +81,7 @@ TopLevelDistributorTestUtil::setup_distributor(int redundancy, // triggerDistributionChange(). // This isn't pretty, folks, but it avoids breaking the world for now, // as many tests have implicit assumptions about this being the behavior. - _distributor->propagateDefaultDistribution(distribution); + _distributor->propagate_default_distribution_thread_unsafe(distribution); // Explicitly init the stripe pool since onOpen isn't called during testing _distributor->start_stripe_pool(); enable_distributor_cluster_state(state); @@ -446,7 +446,7 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Di { _node->getComponentRegister().setDistribution(std::move(distr)); _distributor->storageDistributionChanged(); - _distributor->enableNextDistribution(); + _distributor->enable_next_distribution_if_changed(); } const lib::ClusterStateBundle& diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index 4bc2f141462..e5cc35bd2d3 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -111,7 +111,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg, _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting()); hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter); - propagateDefaultDistribution(_component.getDistribution()); + propagate_default_distribution_thread_unsafe(_component.getDistribution()); // Stripes not started yet }; TopLevelDistributor::~TopLevelDistributor() @@ -306,10 +306,11 @@ TopLevelDistributor::sendReply(const std::shared_ptr<api::StorageReply>& reply) void TopLevelDistributor::storageDistributionChanged() { + std::lock_guard guard(_distribution_mutex); if (!_distribution || (*_component.getDistribution() != *_distribution)) { LOG(debug, "Distribution changed to %s, must re-fetch bucket information", _component.getDistribution()->toString().c_str()); - _next_distribution = _component.getDistribution(); // FIXME this is not thread safe + _next_distribution = _component.getDistribution(); } else { LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s", _component.getDistribution()->toString().c_str(), @@ -318,20 +319,19 @@ TopLevelDistributor::storageDistributionChanged() } void -TopLevelDistributor::enableNextDistribution() +TopLevelDistributor::enable_next_distribution_if_changed() { + std::lock_guard guard(_distribution_mutex); if (_next_distribution) { _distribution = _next_distribution; _next_distribution = std::shared_ptr<lib::Distribution>(); auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); - _bucket_db_updater->storage_distribution_changed(new_configs); + _bucket_db_updater->storage_distribution_changed(new_configs); // Transitively updates all stripes' configs } } -// TODO STRIPE only used by tests to directly inject new distribution config -// - actually, also by ctor void -TopLevelDistributor::propagateDefaultDistribution( +TopLevelDistributor::propagate_default_distribution_thread_unsafe( std::shared_ptr<const lib::Distribution> distribution) { // Should only be called at ctor time, at which point the pool is not yet running. @@ -417,7 +417,7 @@ framework::ThreadWaitInfo TopLevelDistributor::doCriticalTick([[maybe_unused]] framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - enableNextDistribution(); + enable_next_distribution_if_changed(); fetch_status_requests(); fetch_external_messages(); // Propagates any new configs down to stripe(s) diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h index 95bff77fe40..762c601551f 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.h +++ b/storage/src/vespa/storage/distributor/top_level_distributor.h @@ -156,8 +156,8 @@ private: void handle_status_requests(); void signal_work_was_done(); [[nodiscard]] bool work_was_done() const noexcept; - void enableNextDistribution(); - void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); + void enable_next_distribution_if_changed(); + void propagate_default_distribution_thread_unsafe(std::shared_ptr<const lib::Distribution> distribution); void un_inhibit_maintenance_if_safe_time_passed(); void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg); @@ -217,6 +217,7 @@ private: MetricUpdateHook _metricUpdateHook; DistributorHostInfoReporter _hostInfoReporter; + mutable std::mutex _distribution_mutex; std::shared_ptr<lib::Distribution> _distribution; std::shared_ptr<lib::Distribution> _next_distribution; |