summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-04-29 12:29:45 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-04-29 12:29:45 +0000
commitd1eaf4cd911c334dc2d2aacbd73dfa8e23dca53b (patch)
treef587ad30cfe6d94b75cb099c4648bba891083eae /storage
parent4b98768ff0b2772631e9bcce55568f3171b2015d (diff)
Thread-safe distribution change propagation in top level distributor component
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp16
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.h5
3 files changed, 13 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 5dcc7f5c6ad..9677ea568e8 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -81,7 +81,7 @@ TopLevelDistributorTestUtil::setup_distributor(int redundancy,
// triggerDistributionChange().
// This isn't pretty, folks, but it avoids breaking the world for now,
// as many tests have implicit assumptions about this being the behavior.
- _distributor->propagateDefaultDistribution(distribution);
+ _distributor->propagate_default_distribution_thread_unsafe(distribution);
// Explicitly init the stripe pool since onOpen isn't called during testing
_distributor->start_stripe_pool();
enable_distributor_cluster_state(state);
@@ -446,7 +446,7 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Di
{
_node->getComponentRegister().setDistribution(std::move(distr));
_distributor->storageDistributionChanged();
- _distributor->enableNextDistribution();
+ _distributor->enable_next_distribution_if_changed();
}
const lib::ClusterStateBundle&
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index 4bc2f141462..e5cc35bd2d3 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -111,7 +111,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
_hostInfoReporter.enableReporting(config().getEnableHostInfoReporting());
hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter);
- propagateDefaultDistribution(_component.getDistribution());
+ propagate_default_distribution_thread_unsafe(_component.getDistribution()); // Stripes not started yet
};
TopLevelDistributor::~TopLevelDistributor()
@@ -306,10 +306,11 @@ TopLevelDistributor::sendReply(const std::shared_ptr<api::StorageReply>& reply)
void
TopLevelDistributor::storageDistributionChanged()
{
+ std::lock_guard guard(_distribution_mutex);
if (!_distribution || (*_component.getDistribution() != *_distribution)) {
LOG(debug, "Distribution changed to %s, must re-fetch bucket information",
_component.getDistribution()->toString().c_str());
- _next_distribution = _component.getDistribution(); // FIXME this is not thread safe
+ _next_distribution = _component.getDistribution();
} else {
LOG(debug, "Got distribution change, but the distribution %s was the same as before: %s",
_component.getDistribution()->toString().c_str(),
@@ -318,20 +319,19 @@ TopLevelDistributor::storageDistributionChanged()
}
void
-TopLevelDistributor::enableNextDistribution()
+TopLevelDistributor::enable_next_distribution_if_changed()
{
+ std::lock_guard guard(_distribution_mutex);
if (_next_distribution) {
_distribution = _next_distribution;
_next_distribution = std::shared_ptr<lib::Distribution>();
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution);
- _bucket_db_updater->storage_distribution_changed(new_configs);
+ _bucket_db_updater->storage_distribution_changed(new_configs); // Transitively updates all stripes' configs
}
}
-// TODO STRIPE only used by tests to directly inject new distribution config
-// - actually, also by ctor
void
-TopLevelDistributor::propagateDefaultDistribution(
+TopLevelDistributor::propagate_default_distribution_thread_unsafe(
std::shared_ptr<const lib::Distribution> distribution)
{
// Should only be called at ctor time, at which point the pool is not yet running.
@@ -417,7 +417,7 @@ framework::ThreadWaitInfo
TopLevelDistributor::doCriticalTick([[maybe_unused]] framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- enableNextDistribution();
+ enable_next_distribution_if_changed();
fetch_status_requests();
fetch_external_messages();
// Propagates any new configs down to stripe(s)
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h
index 95bff77fe40..762c601551f 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.h
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.h
@@ -156,8 +156,8 @@ private:
void handle_status_requests();
void signal_work_was_done();
[[nodiscard]] bool work_was_done() const noexcept;
- void enableNextDistribution();
- void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ void enable_next_distribution_if_changed();
+ void propagate_default_distribution_thread_unsafe(std::shared_ptr<const lib::Distribution> distribution);
void un_inhibit_maintenance_if_safe_time_passed();
void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg);
@@ -217,6 +217,7 @@ private:
MetricUpdateHook _metricUpdateHook;
DistributorHostInfoReporter _hostInfoReporter;
+ mutable std::mutex _distribution_mutex;
std::shared_ptr<lib::Distribution> _distribution;
std::shared_ptr<lib::Distribution> _next_distribution;