summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp4
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h1
-rw-r--r--storage/src/tests/distributor/legacy_distributor_test.cpp9
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h23
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h7
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h5
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp26
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.h11
11 files changed, 98 insertions, 20 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 9e87aa08cfa..043f996a4a1 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -25,6 +25,7 @@ DistributorStripeTestUtil::DistributorStripeTestUtil()
_sender(),
_senderDown(),
_hostInfo(),
+ _done_initializing(true),
_messageSender(_sender, _senderDown)
{
_config = getStandardConfig(false);
@@ -47,7 +48,8 @@ DistributorStripeTestUtil::createLinks()
*this,
_messageSender,
*this,
- false);
+ false,
+ _done_initializing);
}
void
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index ae4b4c2913a..ccade98fd01 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -219,6 +219,7 @@ protected:
DistributorMessageSenderStub _sender;
DistributorMessageSenderStub _senderDown;
HostInfo _hostInfo;
+ bool _done_initializing;
struct MessageSenderImpl : public ChainedMessageSender {
DistributorMessageSenderStub& _sender;
diff --git a/storage/src/tests/distributor/legacy_distributor_test.cpp b/storage/src/tests/distributor/legacy_distributor_test.cpp
index 11dc79769a7..0a472430e78 100644
--- a/storage/src/tests/distributor/legacy_distributor_test.cpp
+++ b/storage/src/tests/distributor/legacy_distributor_test.cpp
@@ -301,6 +301,15 @@ TEST_F(LegacyDistributorTest, recovery_mode_on_cluster_state_change) {
EXPECT_TRUE(distributor_is_in_recovery_mode());
}
+TEST_F(LegacyDistributorTest, distributor_considered_initialized_once_self_observed_up) {
+ setupDistributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D:
+ EXPECT_FALSE(_distributor->done_initializing());
+ enableDistributorClusterState("distributor:1 storage:1"); // We're up :D
+ EXPECT_TRUE(_distributor->done_initializing());
+ enableDistributorClusterState("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state
+ EXPECT_TRUE(_distributor->done_initializing());
+}
+
// Migrated to DistributorStripeTest
TEST_F(LegacyDistributorTest, operations_are_throttled) {
setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index 8fae1c6d738..eb2f8217872 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -162,6 +162,15 @@ TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_trigger
EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
}
+TEST_F(TopLevelDistributorTest, distributor_considered_initialized_once_self_observed_up) {
+ setup_distributor(Redundancy(1), NodeCount(2), "distributor:1 .0.s:d storage:1"); // We're down D:
+ EXPECT_FALSE(_distributor->done_initializing());
+ enable_distributor_cluster_state("distributor:1 storage:1"); // We're up :D
+ EXPECT_TRUE(_distributor->done_initializing());
+ enable_distributor_cluster_state("distributor:1 .0.s:d storage:1"); // And down again :I but that does not change init state
+ EXPECT_TRUE(_distributor->done_initializing());
+}
+
// TODO STRIPE consider moving to generic test, not specific to top-level distributor or stripe
TEST_F(TopLevelDistributorTest, contains_time_statement) {
setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
diff --git a/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h
new file mode 100644
index 00000000000..c3a885367ca
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cluster_state_bundle_activation_listener.h
@@ -0,0 +1,23 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+namespace storage::lib { class ClusterStateBundle; }
+
+namespace storage::distributor {
+
+/**
+ * Listener where on_cluster_state_bundle_activated() is invoked by the top-level
+ * bucket DB updater component upon a cluster state activation edge.
+ *
+ * Thread/concurrency note: this listener is always invoked from the top-level
+ * distributor thread and in a context where all stripe threads are paused.
+ * This means the callee must not directly or indirectly try to pause stripe
+ * threads itself, but it may safely modify shared state since no stripe threads
+ * are active.
+ */
+class ClusterStateBundleActivationListener {
+public:
+ virtual ~ClusterStateBundleActivationListener() = default;
+ virtual void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 1a9cb9f303c..d9381c4abf0 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -43,6 +43,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode,
+ bool& done_initializing_ref,
uint32_t stripe_index)
: DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
@@ -68,7 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_external_message_mutex(),
_threadPool(threadPool),
_doneInitializeHandler(doneInitHandler),
- _doneInitializing(false),
+ _done_initializing_ref(done_initializing_ref),
_bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()),
_scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)),
_throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)),
@@ -299,8 +300,8 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
propagateClusterStates();
const auto& baseline_state = *state.getBaselineClusterState();
- if (!_doneInitializing && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) {
- _doneInitializing = true;
+ if (_use_legacy_mode && !_done_initializing_ref && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) {
+ _done_initializing_ref = true; // TODO STRIPE remove; responsibility moved to TopLevelDistributor in non-legacy
_doneInitializeHandler.notifyDoneInitializing();
}
enterRecoveryMode();
@@ -365,9 +366,8 @@ DistributorStripe::leaveRecoveryMode()
if (isInRecoveryMode()) {
LOG(debug, "Leaving recovery mode");
// FIXME don't use shared metric for this
- _metrics.recoveryModeTime.addValue(
- _recoveryTimeStarted.getElapsedTimeAsDouble());
- if (_doneInitializing) {
+ _metrics.recoveryModeTime.addValue(_recoveryTimeStarted.getElapsedTimeAsDouble());
+ if (_done_initializing_ref) {
_must_send_updated_host_info = true;
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index b1b20cf445a..6a3f39d7576 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -66,6 +66,7 @@ public:
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
bool use_legacy_mode,
+ bool& done_initializing_ref, // TODO STRIPE const ref once legacy is gone and stripe can't mutate init state
uint32_t stripe_index = 0);
~DistributorStripe() override;
@@ -147,7 +148,7 @@ public:
void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool initializing() const override {
- return !_doneInitializing;
+ return !_done_initializing_ref;
}
const DistributorConfiguration& getConfig() const override {
@@ -342,8 +343,8 @@ private:
mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo;
mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests;
- DoneInitializeHandler& _doneInitializeHandler;
- bool _doneInitializing;
+ DoneInitializeHandler& _doneInitializeHandler; // TODO STRIPE remove when legacy is gone
+ bool& _done_initializing_ref;
std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
std::unique_ptr<SimpleMaintenanceScanner> _scanner;
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 8c258cda5e4..087ad01d65c 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -32,9 +32,11 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n
DistributorInterface& distributor_interface,
ChainedMessageSender& chained_sender,
std::shared_ptr<const lib::Distribution> bootstrap_distribution,
- StripeAccessor& stripe_accessor)
+ StripeAccessor& stripe_accessor,
+ ClusterStateBundleActivationListener* state_activation_listener)
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_stripe_accessor(stripe_accessor),
+ _state_activation_listener(state_activation_listener),
_active_state_bundle(lib::ClusterState()),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
@@ -61,6 +63,9 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() {
iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first));
}
}
+ if (_state_activation_listener) {
+ _state_activation_listener->on_cluster_state_bundle_activated(_active_state_bundle);
+ }
}
void
@@ -419,12 +424,12 @@ TopLevelBucketDBUpdater::enable_current_cluster_state_bundle_in_distributor_and_
const lib::ClusterStateBundle& state = _pending_cluster_state->getNewClusterStateBundle();
_active_state_bundle = _pending_cluster_state->getNewClusterStateBundle();
+
+ guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer());
propagate_active_state_bundle_internally();
LOG(debug, "TopLevelBucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
-
- guard.enable_cluster_state_bundle(state, _pending_cluster_state->hasBucketOwnershipTransfer());
}
void TopLevelBucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state,
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index c9e28eee72d..2eccb70fdf9 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -28,6 +28,7 @@ namespace storage::distributor {
struct BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
+class ClusterStateBundleActivationListener;
class DistributorInterface;
class StripeAccessor;
class StripeAccessGuard;
@@ -42,7 +43,8 @@ public:
DistributorInterface& distributor_interface,
ChainedMessageSender& chained_sender,
std::shared_ptr<const lib::Distribution> bootstrap_distribution,
- StripeAccessor& stripe_accessor);
+ StripeAccessor& stripe_accessor,
+ ClusterStateBundleActivationListener* state_activation_listener);
~TopLevelBucketDBUpdater() override;
void flush();
@@ -109,6 +111,7 @@ private:
// TODO STRIPE remove once distributor component dependencies have been pruned
StripeAccessor& _stripe_accessor;
+ ClusterStateBundleActivationListener* _state_activation_listener;
lib::ClusterStateBundle _active_state_bundle;
const DistributorNodeContext& _node_ctx;
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index bbef20b2a23..d99a02081d8 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -52,7 +52,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
DistributorStripePool& stripe_pool,
- DoneInitializeHandler& doneInitHandler,
+ DoneInitializeHandler& done_init_handler,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
ChainedMessageSender* messageSender)
@@ -60,7 +60,9 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
framework::StatusReporter("distributor", "Distributor"),
_node_identity(node_identity),
_comp_reg(compReg),
+ _done_init_handler(done_init_handler),
_use_legacy_mode(num_distributor_stripes == 0),
+ _done_initializing(false),
_metrics(std::make_shared<DistributorMetricSet>()),
_total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>()
: std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
@@ -75,7 +77,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
(_use_legacy_mode ? *_ideal_state_metrics
: _ideal_state_total_metrics->stripe(0)),
node_identity, threadPool,
- doneInitHandler, *this, *this, _use_legacy_mode)),
+ _done_init_handler, *this, *this, _use_legacy_mode, _done_initializing)),
_stripe_pool(stripe_pool),
_stripes(),
_stripe_accessor(),
@@ -116,14 +118,16 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
_bucket_db_updater = std::make_unique<TopLevelBucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
- *_stripe_accessor);
+ *_stripe_accessor,
+ this);
_stripes.emplace_back(std::move(_stripe));
for (size_t i = 1; i < num_distributor_stripes; ++i) {
_stripes.emplace_back(std::make_unique<DistributorStripe>(compReg,
_total_metrics->stripe(i),
_ideal_state_total_metrics->stripe(i),
node_identity, threadPool,
- doneInitHandler, *this, *this, _use_legacy_mode, i));
+ _done_init_handler, *this, *this, _use_legacy_mode,
+ _done_initializing, i));
}
_stripe_scan_stats.resize(num_distributor_stripes);
_distributorStatusDelegate.registerStatusPage();
@@ -682,10 +686,10 @@ TopLevelDistributor::enableNextConfig() // TODO STRIPE rename to enable_next_con
}
}
-
void
TopLevelDistributor::notify_stripe_wants_to_send_host_info(uint16_t stripe_index)
{
+ // TODO STRIPE assert(_done_initializing); (can't currently do due to some unit test restrictions; uncomment and find out)
LOG(debug, "Stripe %u has signalled an intent to send host info out-of-band", stripe_index);
std::lock_guard lock(_stripe_scan_notify_mutex);
assert(!_use_legacy_mode);
@@ -733,6 +737,18 @@ TopLevelDistributor::send_host_info_if_appropriate()
}
void
+TopLevelDistributor::on_cluster_state_bundle_activated(const lib::ClusterStateBundle& new_bundle)
+{
+ assert(!_use_legacy_mode);
+ lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex());
+ if (!_done_initializing && (new_bundle.getBaselineClusterState()->getNodeState(my_node).getState() == lib::State::UP)) {
+ _done_initializing = true;
+ _done_init_handler.notifyDoneInitializing();
+ }
+ LOG(debug, "Activated new state version in distributor: %s", new_bundle.toString().c_str());
+}
+
+void
TopLevelDistributor::fetch_status_requests()
{
if (_fetched_status_requests.empty()) {
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h
index 5de4b9c1aaa..ece9a61dfa3 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.h
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.h
@@ -3,6 +3,7 @@
#pragma once
#include "bucket_spaces_stats_provider.h"
+#include "cluster_state_bundle_activation_listener.h"
#include "top_level_bucket_db_updater.h"
#include "distributor_component.h"
#include "distributor_host_info_reporter.h"
@@ -60,7 +61,8 @@ class TopLevelDistributor final
public framework::TickingThread,
public MinReplicaProvider,
public BucketSpacesStatsProvider,
- public StripeHostInfoNotifier
+ public StripeHostInfoNotifier,
+ public ClusterStateBundleActivationListener
{
public:
TopLevelDistributor(DistributorComponentRegister&,
@@ -86,6 +88,8 @@ public:
const NodeIdentity& node_identity() const noexcept { return _node_identity; }
+ [[nodiscard]] bool done_initializing() const noexcept { return _done_initializing; }
+
// Implements DistributorInterface and DistributorMessageSender.
DistributorMetricSet& metrics() override { return getMetrics(); }
const DistributorConfiguration& config() const override;
@@ -200,6 +204,9 @@ private:
uint32_t random_stripe_idx();
uint32_t stripe_of_bucket_id(const document::BucketId& bucket_id, const api::StorageMessage& msg);
+ // ClusterStateBundleActivationListener impl:
+ void on_cluster_state_bundle_activated(const lib::ClusterStateBundle&) override;
+
struct StripeScanStats {
bool wants_to_send_host_info = false;
bool has_reported_in_at_least_once = false;
@@ -209,7 +216,9 @@ private:
const NodeIdentity _node_identity;
DistributorComponentRegister& _comp_reg;
+ DoneInitializeHandler& _done_init_handler;
const bool _use_legacy_mode;
+ bool _done_initializing;
std::shared_ptr<DistributorMetricSet> _metrics;
std::shared_ptr<DistributorTotalMetrics> _total_metrics;
std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics;