summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-09-14 13:01:46 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-09-14 14:48:56 +0000
commitd35dbfdcdc7288079a97fb93e609c370a1aba89a (patch)
tree7f927475ac9d759f0a27103e853e5a9b0b2d94ba /storage
parent92ad19102e936032fe61fa5a93bc88a2fc246a22 (diff)
Move initializing handling to top-level distributor
Add a listener interface that lets the top-level distributor intercept cluster state activations and use this for triggering the node init edge. This happens when all stripes are paused so this is safe from data races. Legacy code in the DistributorStripe remains for now.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp4
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h1
-rw-r--r--storage/src/tests/distributor/legacy_distributor_test.cpp9
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h23
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h7
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h5
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp26
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.h11
11 files changed, 98 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 9e87aa08cfa..043f996a4a1 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -25,6 +25,7 @@ DistributorStripeTestUtil::DistributorStripeTestUtil()
_sender(),
_senderDown(),
_hostInfo(),
+ _done_initializing(true),
_messageSender(_sender, _senderDown)
{
_config = getStandardConfig(false);
@@ -47,7 +48,8 @@ DistributorStripeTestUtil::createLinks()
*this,
_messageSender,
*this,
- false);
+ false,
+ _done_initializing);
}
void
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index ae4b4c2913a..ccade98fd01 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -219,6 +219,7 @@ protected:
DistributorMessageSenderStub _sender;
DistributorMessageSenderStub _senderDown;
HostInfo _hostInfo;
+ bool _done_initializing;
struct MessageSenderImpl : public ChainedMessageSender {
DistributorMessageSenderStub& _sender;
diff --git a/storage/src/tests/distributor/legacy_distributor_test.cpp b/storage/src/tests/distributor/legacy_distributor_test.cpp
index 11dc79769a7..0a472430e78 100644
--- a/storage/src/tests/distributor/legacy_distributor_test.cpp
+++ b/storage/src/tests/distributor/legacy_distributor_test.cpp
@@ -301,6 +301,15 @@ TEST_F(LegacyDistributorTest, recovery_mode_on_cluster_state_change) {
EXPECT_TRUE(distributor_is_in_recovery_mode());
}
+TEST_F(LegacyDistributorTest, distributor_considered_initialized_once_self_observed_up) {
+ setupDistributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D:
+ EXPECT_FALSE(_distributor->done_initializing());
+ enableDistributorClusterState("distributor:1 storage:1"); // We're up :D
+ EXPECT_TRUE(_distributor->done_initializing());
+ enableDistributorClusterState("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state
+ EXPECT_TRUE(_distributor->done_initializing());
+}
+
// Migrated to DistributorStripeTest
TEST_F(LegacyDistributorTest, operations_are_throttled) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index 8fae1c6d738..eb2f8217872 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -162,6 +162,15 @@ TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_trigger
EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
}
+TEST_F(TopLevelDistributorTest, distributor_considered_initialized_once_self_observed_up) {
+ setup_distributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D:
+ EXPECT_FALSE(_distributor->done_initializing());
+ enable_distributor_cluster_state("distributor:1 storage:1"); // We're up :D
+ EXPECT_TRUE(_distributor->done_initializing());
+ enable_distributor_cluster_state("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state
+ EXPECT_TRUE(_distributor->done_initializing());
+}
+
// TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe
TEST_F(TopLevelDistributorTest, contains_time_statement) {
setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
diff --git a/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h
new file mode 100644
index 00000000000..c3a885367ca
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h
@@ -0,0 +1,23 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+namespace storage::lib { class ClusterStateBundle; }
+
+namespace storage::distributor {
+
+/**
+ * Listener where on_cluster_state_bundle_activated() is invoked by the top-level
+ * bucket DB updater component upon a cluster state activation edge.
+ *
+ * Thread/concurrency note: this listener is always invoked from the top-level
+ * distributor thread and in a context where all stripe threads are paused.
+ * This means the callee must not directly or indirectly try to pause stripe
+ * threads itself, but it may safely modify shared state since no stripe threads
+ * are active.
+ */
+class ClusterStateBundleActivationListener {
+public:
+ virtual ~ClusterStateBundleActivationListener() = default;
+ virtual void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 1a9cb9f303c..d9381c4abf0 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -43,6 +43,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode,
+ bool& done_initializing_ref,
uint32_t stripe_index)
: DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
@@ -68,7 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_external_message_mutex(),
_threadPool(threadPool),
_doneInitializeHandler(doneInitHandler),
- _doneInitializing(false),
+ _done_initializing_ref(done_initializing_ref),
_bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
_scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
@@ -299,8 +300,8 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
propagateClusterStates();
const auto& baseline_state = *state.getBaselineClusterState();
- if (!_doneInitializing && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) {
- _doneInitializing = true;
+ if (_use_legacy_mode && !_done_initializing_ref && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) {
+ _done_initializing_ref = true; // TODO STRIPE remove; responsibility moved to TopLevelDistributor in non-legacy
_doneInitializeHandler.notifyDoneInitializing();
}
enterRecoveryMode();
@@ -365,9 +366,8 @@ DistributorStripe::leaveRecoveryMode()
if (isInRecoveryMode()) {
LOG(debug, "Leaving recovery mode");
// FIXME don't use shared metric for this
- _metrics.recoveryModeTime.addValue(
- _recoveryTimeStarted.getElapsedTimeAsDouble());
- if (_doneInitializing) {
+ _metrics.recoveryModeTime.addValue(_recoveryTimeStarted.getElapsedTimeAsDouble());
+ if (_done_initializing_ref) {
_must_send_updated_host_info = true;
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index b1b20cf445a..6a3f39d7576 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -66,6 +66,7 @@ public:
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode,
+ bool& done_initializing_ref, // TODO STRIPE const ref once legacy is gone and stripe can't mutate init state
uint32_t stripe_index = 0);
~DistributorStripe() override;
@@ -147,7 +148,7 @@ public:
void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool initializing() const override {
- return !_doneInitializing;
+ return !_done_initializing_ref;
}
const DistributorConfiguration& getConfig() const override {
@@ -342,8 +343,8 @@ private:
mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo;
mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests;
- DoneInitializeHandler& _doneInitializeHandler;
- bool _doneInitializing;
+ DoneInitializeHandler& _doneInitializeHandler; // TODO STRIPE remove when legacy is gone
+ bool& _done_initializing_ref;
std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
std::unique_ptr<SimpleMaintenanceScanner> _scanner;
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 8c258cda5e4..087ad01d65c 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -32,9 +32,11 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n
DistributorInterface& distributor_interface,
ChainedMessageSender& chained_sender,
std::shared_ptr<const lib::Distribution> bootstrap_distribution,
- StripeAccessor& stripe_accessor)
+ StripeAccessor& stripe_accessor,
+ ClusterStateBundleActivationListener* state_activation_listener)
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_stripe_accessor(stripe_accessor),
+ _state_activation_listener(state_activation_listener),
_active_state_bundle(lib::ClusterState()),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
@@ -61,6 +63,9 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() {
iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first));
}
}
+ if (_state_activation_listener) {
+ _state_activation_listener->on_cluster_state_bundle_activated(_active_state_bundle);
+ }
}
void
@@ -419,12 +424,12 @@ TopLevelBucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_
const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle();
_active_state_bundle = _pending_cluster_state->getNewClusterStateBundle();
+
+ guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer());
propagate_active_state_bundle_internally();
LOG(debug, "TopLevelBucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
-
- guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer());
}
void TopLevelBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state,
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index c9e28eee72d..2eccb70fdf9 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -28,6 +28,7 @@ namespace storage::distributor {
struct BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
+class ClusterStateBundleActivationListener;
class DistributorInterface;
class StripeAccessor;
class StripeAccessGuard;
@@ -42,7 +43,8 @@ public:
DistributorInterface& distributor_interface,
ChainedMessageSender& chained_sender,
std::shared_ptr<const lib::Distribution> bootstrap_distribution,
- StripeAccessor& stripe_accessor);
+ StripeAccessor& stripe_accessor,
+ ClusterStateBundleActivationListener* state_activation_listener);
~TopLevelBucketDBUpdater() override;
void flush();
@@ -109,6 +111,7 @@ private:
// TODO STRIPE remove once distributor component dependencies have been pruned
StripeAccessor& _stripe_accessor;
+ ClusterStateBundleActivationListener* _state_activation_listener;
lib::ClusterStateBundle _active_state_bundle;
const DistributorNodeContext& _node_ctx;
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index bbef20b2a23..d99a02081d8 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -52,7 +52,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
DistributorStripePool& stripe_pool,
- DoneInitializeHandler& doneInitHandler,
+ DoneInitializeHandler& done_init_handler,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
ChainedMessageSender* messageSender)
@@ -60,7 +60,9 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
framework::StatusReporter("distributor", "Distributor"),
_node_identity(node_identity),
_comp_reg(compReg),
+ _done_init_handler(done_init_handler),
_use_legacy_mode(num_distributor_stripes == 0),
+ _done_initializing(false),
_metrics(std::make_shared<DistributorMetricSet>()),
_total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>()
: std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
@@ -75,7 +77,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
(_use_legacy_mode ? *_ideal_state_metrics
: _ideal_state_total_metrics->stripe(0)),
node_identity, threadPool,
- doneInitHandler, *this, *this, _use_legacy_mode)),
+ _done_init_handler, *this, *this, _use_legacy_mode, _done_initializing)),
_stripe_pool(stripe_pool),
_stripes(),
_stripe_accessor(),
@@ -116,14 +118,16 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
_bucket_db_updater = std::make_unique<TopLevelBucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
- *_stripe_accessor);
+ *_stripe_accessor,
+ this);
_stripes.emplace_back(std::move(_stripe));
for (size_t i = 1; i < num_distributor_stripes; ++i) {
_stripes.emplace_back(std::make_unique<DistributorStripe>(compReg,
_total_metrics->stripe(i),
_ideal_state_total_metrics->stripe(i),
node_identity, threadPool,
- doneInitHandler, *this, *this, _use_legacy_mode, i));
+ _done_init_handler, *this, *this, _use_legacy_mode,
+ _done_initializing, i));
}
_stripe_scan_stats.resize(num_distributor_stripes);
_distributorStatusDelegate.registerStatusPage();
@@ -682,10 +686,10 @@ TopLevelDistributor::enableNextConfig() // TODO STRIPE rename to enable_next_con
}
}
-
void
TopLevelDistributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index)
{
+ // TODO STRIPE assert(_done_initializing); (can't currently do due to some unit test restrictions; uncomment and find out)
LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index);
std::lock_guard lock(_stripe_scan_notify_mutex);
assert(!_use_legacy_mode);
@@ -733,6 +737,18 @@ TopLevelDistributor::send_host_info_if_appropriate()
}
void
+TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBundle& new_bundle)
+{
+ assert(!_use_legacy_mode);
+ lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex());
+ if (!_done_initializing && (new_bundle.getBaselineClusterState()->getNodeState(my_node).getState() == lib::State::UP)) {
+ _done_initializing = true;
+ _done_init_handler.notifyDoneInitializing();
+ }
+ LOG(debug, "Activated new state version in distributor: %s", new_bundle.toString().c_str());
+}
+
+void
TopLevelDistributor::fetch_status_requests()
{
if (_fetched_status_requests.empty()) {
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h
index 5de4b9c1aaa..ece9a61dfa3 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.h
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.h
@@ -3,6 +3,7 @@
#pragma once
#include "bucket_spaces_stats_provider.h"
+#include "cluster_state_bundle_activation_listener.h"
#include "top_level_bucket_db_updater.h"
#include "distributor_component.h"
#include "distributor_host_info_reporter.h"
@@ -60,7 +61,8 @@ class TopLevelDistributor final
public framework::TickingThread,
public MinReplicaProvider,
public BucketSpacesStatsProvider,
- public StripeHostInfoNotifier
+ public StripeHostInfoNotifier,
+ public ClusterStateBundleActivationListener
{
public:
TopLevelDistributor(DistributorComponentRegister&,
@@ -86,6 +88,8 @@ public:
const NodeIdentity& node_identity() const noexcept { return _node_identity; }
+ [[nodiscard]] bool done_initializing() const noexcept { return _done_initializing; }
+
// Implements DistributorInterface and DistributorMessageSender.
DistributorMetricSet& metrics() override { return getMetrics(); }
const DistributorConfiguration& config() const override;
@@ -200,6 +204,9 @@ private:
uint32_t random_stripe_idx();
uint32_t stripe_of_bucket_id(const document::BucketId& bucket_id, const api::StorageMessage& msg);
+ // ClusterStateBundleActivationListener impl:
+ void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) override;
+
struct StripeScanStats {
bool wants_to_send_host_info = false;
bool has_reported_in_at_least_once = false;
@@ -209,7 +216,9 @@ private:
const NodeIdentity _node_identity;
DistributorComponentRegister& _comp_reg;
+ DoneInitializeHandler& _done_init_handler;
const bool _use_legacy_mode;
+ bool _done_initializing;
std::shared_ptr<DistributorMetricSet> _metrics;
std::shared_ptr<DistributorTotalMetrics> _total_metrics;
std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics;