summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-05-03 18:13:18 +0200
committerGitHub <noreply@github.com>2021-05-03 18:13:18 +0200
commit448d9205aa15351fbb770a7fbfae88d5b33bbe3d (patch)
tree0b6a281306a403b156129f6b10157528388d9975
parent896785b1a3b069ed13d8b51e7d35d7d333d80d02 (diff)
parenta4e1bdcf857daa21b0c6b7d40ad13f48260a946d (diff)
Merge pull request #17713 from vespa-engine/vekterli/make-more-distributor-internals-private-2
Make more distributor internals private 2; the much anticipated sequel [run-systemtest]
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp14
-rw-r--r--storage/src/tests/distributor/distributortest.cpp36
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp35
-rw-r--r--storage/src/tests/distributor/distributortestutil.h13
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp19
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp10
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp2
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp54
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h45
10 files changed, 141 insertions, 89 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 97fccf58901..7e8fec3b83a 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut
TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20));
_sender.clear();
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
complete_recovery_mode();
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
std::string distConfig(getDistConfig6Nodes4Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
// No replies received yet, still no recovery mode.
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
ASSERT_EQ(messageCount(6), _sender.commands().size());
uint32_t numBuckets = 10;
@@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode)
// Pending cluster state (i.e. distribution) has been enabled, which should
// cause recovery mode to be entered.
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
complete_recovery_mode();
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
}
namespace {
@@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until
"version:2 distributor:1 storage:4", n_buckets, 4));
// Version should not be switched over yet
- EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
+ EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion());
EXPECT_EQ(uint64_t(0), mutable_default_db().size());
EXPECT_EQ(uint64_t(0), mutable_global_db().size());
EXPECT_FALSE(activate_cluster_state_version(2));
- EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
+ EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion());
EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size());
EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size());
}
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 7958306db5f..2944348e639 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -175,6 +175,10 @@ struct DistributorTest : Test, DistributorTestUtil {
return _distributor->handleMessage(msg);
}
+ uint64_t db_sample_interval_sec() const noexcept {
+ return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count();
+ }
+
void configure_stale_reads_enabled(bool enabled) {
ConfigBuilder builder;
builder.allowStaleReadsDuringClusterStateTransitions = enabled;
@@ -280,19 +284,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
"storage:1 .0.s:d distributor:1");
enableDistributorClusterState("storage:1 distributor:1");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
for (uint32_t i = 0; i < 3; ++i) {
addNodesToBucketDB(document::BucketId(16, i), "0=1");
}
for (int i = 0; i < 3; ++i) {
tick();
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
}
tick();
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
enableDistributorClusterState("storage:2 distributor:1");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
}
// TODO -> stripe test
@@ -489,14 +493,6 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics)
}
}
-namespace {
-
-uint64_t db_sample_interval_sec(const Distributor& d) noexcept {
- return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count();
-}
-
-}
-
// TODO -> stripe test
TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) {
getClock().setAbsoluteTimeInSeconds(1000);
@@ -517,7 +513,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim
// interval has passed. Instead, old metric gauge values should be preserved.
addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2");
- const auto sample_interval_sec = db_sample_interval_sec(getDistributor());
+ const auto sample_interval_sec = db_sample_interval_sec();
getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet.
tickDistributorNTimes(50);
distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l));
@@ -925,7 +921,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes
reply->setResult(api::ReturnCode(api::ReturnCode::BUSY));
_distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply)));
- auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo();
+ auto& node_info = pending_message_tracker().getNodeInfo();
EXPECT_TRUE(node_info.isBusy(0));
getClock().addSecondsToTime(99);
@@ -1045,7 +1041,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
tickDistributorNTimes(5); // 1/3rds into second round through database
enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
// Bucket space stats should now be invalid per space per node, pending stats
// from state version 2. Exposing stats from version 1 risks reporting stale
// information back to the cluster controller.
@@ -1066,13 +1062,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep
addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a");
enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
tickDistributorNTimes(1); // DB round not yet complete
EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick
EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
// Now out of recovery mode, subsequent round completions should not send replies
tickDistributorNTimes(10);
EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
@@ -1080,12 +1076,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep
void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) {
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
// 2 buckets with missing replicas triggering merge pending stats
addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a");
tickDistributorNTimes(3);
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
const auto space_name = FixedBucketSpaces::to_string(space);
assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats());
// First completed scan sends off merge stats et al to cluster controller
@@ -1214,7 +1210,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s
TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) {
set_up_and_start_get_op_with_stale_reads_enabled(true);
Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1));
- EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage(
+ EXPECT_FALSE(pending_message_tracker().hasPendingMessage(
0, bucket, api::MessageType::GET_ID));
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index bdd953b6206..5d204693971 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -425,6 +425,36 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const {
return _distributor->getReadOnlyBucketSpaceRepo();
}
+bool
+DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept {
+ return _distributor->isInRecoveryMode();
+}
+
+const lib::ClusterStateBundle&
+DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept {
+ return getDistributor().getClusterStateBundle();
+}
+
+std::string
+DistributorTestUtil::active_ideal_state_operations() const {
+ return _distributor->getActiveIdealStateOperations();
+}
+
+const PendingMessageTracker&
+DistributorTestUtil::pending_message_tracker() const noexcept {
+ return _distributor->getPendingMessageTracker();
+}
+
+PendingMessageTracker&
+DistributorTestUtil::pending_message_tracker() noexcept {
+ return _distributor->getPendingMessageTracker();
+}
+
+std::chrono::steady_clock::duration
+DistributorTestUtil::db_memory_sample_interval() const noexcept {
+ return _distributor->db_memory_sample_interval();
+}
+
const lib::Distribution&
DistributorTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
@@ -453,4 +483,9 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun
getBucketDBUpdater().simulate_cluster_state_bundle_activation(state);
}
+void
+DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) {
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index b845456e873..0ed2498a0a2 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -118,9 +118,8 @@ public:
storage::distributor::DistributorStripeComponent& distributor_component();
storage::distributor::DistributorStripeOperationContext& operation_context();
- Distributor& getDistributor() {
- return *_distributor;
- }
+ Distributor& getDistributor() noexcept { return *_distributor; }
+ const Distributor& getDistributor() const noexcept { return *_distributor; }
bool tick();
@@ -140,6 +139,12 @@ public:
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo();
const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const;
+ [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept;
+ [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept;
+ [[nodiscard]] std::string active_ideal_state_operations() const;
+ [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
+ [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
+ [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
const lib::Distribution& getDistribution() const;
// "End to end" distribution change trigger, which will invoke the bucket
@@ -190,6 +195,8 @@ public:
DistributorMessageSenderStub& sender() noexcept { return _sender; }
const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
+
+ void setSystemState(const lib::ClusterState& systemState);
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index ce9aa0a6800..0a36e5cd0e5 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -38,10 +38,6 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
close();
}
- void setSystemState(const lib::ClusterState& systemState) {
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
- }
-
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
const PendingMessageTracker& tracker,
@@ -120,7 +116,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) {
ost.str());
tick();
- EXPECT_EQ("", _distributor->getActiveIdealStateOperations());
+ EXPECT_EQ("", active_ideal_state_operations());
}
@@ -143,13 +139,12 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) {
EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n"
"setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n"
"setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d"));
- EXPECT_EQ("", _distributor->getActiveIdealStateOperations());
- EXPECT_EQ(0, _distributor->getPendingMessageTracker()
- .getNodeInfo().getPendingCount(0));
+ EXPECT_EQ("", active_ideal_state_operations());
+ EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0));
}
TEST_F(IdealStateManagerTest, recheck_when_active) {
@@ -162,17 +157,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) {
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
}
TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) {
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 02491b670c6..478f20796d0 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
createLinks();
setupDistributor(1, 1, "version:1 distributor:1 storage:1");
_op_owner = std::make_unique<OperationOwner>(_sender, getClock());
- _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker());
+ _sender.setPendingMessageTracker(pending_message_tracker());
addNodesToBucketDB(_sub_bucket, "0=1/2/3/t");
}
@@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) {
return std::make_shared<ReadForWriteVisitorOperationStarter>(
std::move(visitor_op), operation_sequencer(),
- *_op_owner, getDistributor().getPendingMessageTracker(),
+ *_op_owner, pending_message_tracker(),
_mock_uuid_generator);
}
};
@@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend
std::move(nodes),
api::Timestamp(123456));
merge->setAddress(make_storage_address(0));
- getDistributor().getPendingMessageTracker().insert(merge);
+ pending_message_tracker().insert(merge);
_op_owner->start(op, OperationStarter::Priority(120));
ASSERT_EQ("", _sender.getCommands(true));
EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
@@ -157,13 +157,13 @@ struct ConcurrentMutationFixture {
_mutation = _test.sender().command(0);
// Since pending message tracking normally happens in the distributor itself during sendUp,
// we have to emulate this and explicitly insert the sent message into the pending mapping.
- _test.getDistributor().getPendingMessageTracker().insert(_mutation);
+ _test.pending_message_tracker().insert(_mutation);
}
void unblock_bucket() {
// Pretend update operation completed
auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply());
- _test.getDistributor().getPendingMessageTracker().reply(*update_reply);
+ _test.pending_message_tracker().reply(*update_reply);
_test._op_owner->handleReply(update_reply);
}
};
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 9b49f1347cc..8310e4e38e0 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
};
void enableClusterState(const lib::ClusterState& systemState) {
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
+ setSystemState(systemState);
}
void insertJoinableBuckets();
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index ccbb64e8970..f9a4e6dbe0f 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
TEST_F(VisitorOperationTest, visit_ideal_node) {
ClusterState state("distributor:1 storage:3");
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state));
+ enable_distributor_cluster_state(lib::ClusterStateBundle(state));
// Create buckets in bucketdb
for (int i=0; i<32; i++ ) {
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index bdf3f30fdc9..6975a2595ad 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -54,8 +54,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_comp_reg(compReg),
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
+ _use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
- doneInitHandler, *this, (num_distributor_stripes == 0))),
+ doneInitHandler, *this, _use_legacy_mode)),
_stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
_component(*this, compReg, "distributor"),
_total_config(_component.total_distributor_config_sp()),
@@ -71,14 +72,14 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
- if (num_distributor_stripes > 0) {
+ if (!_use_legacy_mode) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
*_stripe_accessor);
}
- _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
+ _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting());
_distributorStatusDelegate.registerStatusPage();
hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter);
propagateDefaultDistribution(_component.getDistribution());
@@ -90,43 +91,54 @@ Distributor::~Distributor()
closeNextLink();
}
+// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists.
+// All functions below that assert on _use_legacy_mode are only currently used by tests
+
bool
Distributor::isInRecoveryMode() const noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->isInRecoveryMode();
}
const PendingMessageTracker&
Distributor::getPendingMessageTracker() const {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getPendingMessageTracker();
}
PendingMessageTracker&
Distributor::getPendingMessageTracker() {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getPendingMessageTracker();
}
DistributorBucketSpaceRepo&
Distributor::getBucketSpaceRepo() noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getBucketSpaceRepo();
}
const DistributorBucketSpaceRepo&
Distributor::getBucketSpaceRepo() const noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getBucketSpaceRepo();
}
DistributorBucketSpaceRepo&
Distributor::getReadOnlyBucketSpaceRepo() noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getReadOnlyBucketSpaceRepo();
}
const DistributorBucketSpaceRepo&
Distributor::getReadyOnlyBucketSpaceRepo() const noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getReadOnlyBucketSpaceRepo();;
}
storage::distributor::DistributorStripeComponent&
Distributor::distributor_component() noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
// TODO STRIPE We need to grab the stripe's component since tests like to access
// these things uncomfortably directly.
return _stripe->_component;
@@ -134,46 +146,55 @@ Distributor::distributor_component() noexcept {
StripeBucketDBUpdater&
Distributor::bucket_db_updater() {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->bucket_db_updater();
}
const StripeBucketDBUpdater&
Distributor::bucket_db_updater() const {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->bucket_db_updater();
}
IdealStateManager&
Distributor::ideal_state_manager() {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->ideal_state_manager();
}
const IdealStateManager&
Distributor::ideal_state_manager() const {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->ideal_state_manager();
}
ExternalOperationHandler&
Distributor::external_operation_handler() {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->external_operation_handler();
}
const ExternalOperationHandler&
Distributor::external_operation_handler() const {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->external_operation_handler();
}
BucketDBMetricUpdater&
Distributor::bucket_db_metric_updater() const noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->_bucketDBMetricUpdater;
}
const DistributorConfiguration&
Distributor::getConfig() const {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getConfig();
}
std::chrono::steady_clock::duration
Distributor::db_memory_sample_interval() const noexcept {
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->db_memory_sample_interval();
}
@@ -254,7 +275,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
// FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
// regardless of what RPC thread (comm mgr, FRT...) this is called from!
- if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
+ if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
return msg->callHandler(*_bucket_db_updater, msg);
}
// TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
@@ -265,7 +286,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
bool
Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
- if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
+ if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
return reply->callHandler(*_bucket_db_updater, reply);
}
return _stripe->handleReply(reply);
@@ -275,6 +296,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
+ assert(_use_legacy_mode); // TODO STRIPE
return _stripe->handleMessage(msg);
}
@@ -299,6 +321,7 @@ Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply)
const lib::ClusterStateBundle&
Distributor::getClusterStateBundle() const
{
+ assert(_use_legacy_mode); // TODO STRIPE
// TODO STRIPE must offer a single unifying state across stripes
return _stripe->getClusterStateBundle();
}
@@ -306,6 +329,7 @@ Distributor::getClusterStateBundle() const
void
Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
{
+ assert(_use_legacy_mode); // TODO STRIPE
// TODO STRIPE make test injection/force-function
_stripe->enableClusterStateBundle(state);
}
@@ -313,7 +337,7 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
void
Distributor::storageDistributionChanged()
{
- if (_bucket_db_updater) {
+ if (!_use_legacy_mode) {
if (!_distribution || (*_component.getDistribution() != *_distribution)) {
LOG(debug, "Distribution changed to %s, must re-fetch bucket information",
_component.getDistribution()->toString().c_str());
@@ -332,7 +356,7 @@ Distributor::storageDistributionChanged()
void
Distributor::enableNextDistribution()
{
- if (_bucket_db_updater) {
+ if (!_use_legacy_mode) {
if (_next_distribution) {
_distribution = _next_distribution;
_next_distribution = std::shared_ptr<lib::Distribution>();
@@ -350,7 +374,7 @@ void
Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
- // TODO STRIPE top-level bucket DB updater
+ // TODO STRIPE cannot directly access stripe when not in legacy mode!
_stripe->propagateDefaultDistribution(std::move(distribution));
}
@@ -383,6 +407,7 @@ Distributor::propagateInternalScanMetricsToExternal()
void
Distributor::scanAllBuckets()
{
+ assert(_use_legacy_mode); // TODO STRIPE
_stripe->scanAllBuckets();
}
@@ -390,11 +415,12 @@ framework::ThreadWaitInfo
Distributor::doCriticalTick(framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- if (_bucket_db_updater) {
+ if (!_use_legacy_mode) {
enableNextDistribution();
}
// Propagates any new configs down to stripe(s)
enableNextConfig();
+ // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise
_stripe->doCriticalTick(idx);
_tickResult.merge(_stripe->_tickResult);
return _tickResult;
@@ -403,7 +429,7 @@ Distributor::doCriticalTick(framework::ThreadIndex idx)
framework::ThreadWaitInfo
Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
- if (_bucket_db_updater) {
+ if (!_use_legacy_mode) {
_bucket_db_updater->resend_delayed_messages();
}
// TODO STRIPE stripes need their own thread loops!
@@ -417,17 +443,17 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
{
// Only lazily trigger a config propagation and internal update if something has _actually changed_.
if (_component.internal_config_generation() != _current_internal_config_generation) {
- if (_bucket_db_updater) {
- _total_config = _component.total_distributor_config_sp();
+ _total_config = _component.total_distributor_config_sp();
+ if (!_use_legacy_mode) {
auto guard = _stripe_accessor->rendezvous_and_hold_all();
guard->update_total_distributor_config(_component.total_distributor_config_sp());
} else {
_stripe->update_total_distributor_config(_component.total_distributor_config_sp());
}
- _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
+ _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting());
_current_internal_config_generation = _component.internal_config_generation();
}
- if (!_bucket_db_updater) {
+ if (_use_legacy_mode) {
// TODO STRIPE remove these once tests are fixed to trigger reconfig properly
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
_stripe->enableNextConfig(); // TODO STRIPE avoid redundant call
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 71d71290bb5..0420f1b1f22 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -83,12 +83,6 @@ public:
int getDistributorIndex() const override { return _component.node_index(); }
const ClusterContext& cluster_context() const override { return _component.cluster_context(); }
- /**
- * Enables a new cluster state. Called after the bucket db updater has
- * retrieved all bucket info related to the change.
- */
- void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle);
-
void storageDistributionChanged() override;
bool handleReply(const std::shared_ptr<api::StorageReply>& reply);
@@ -99,26 +93,9 @@ public:
bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
- std::string getActiveIdealStateOperations() const;
-
virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
- const lib::ClusterStateBundle& getClusterStateBundle() const;
- const DistributorConfiguration& getConfig() const;
-
- bool isInRecoveryMode() const noexcept;
-
- PendingMessageTracker& getPendingMessageTracker();
- const PendingMessageTracker& getPendingMessageTracker() const;
-
- DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept;
- const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept;
- DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept;
- const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept;
-
- storage::distributor::DistributorStripeComponent& distributor_component() noexcept;
-
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
@@ -135,8 +112,6 @@ public:
Distributor& _self;
};
- std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
-
private:
friend struct DistributorTest;
friend class BucketDBUpdaterTest;
@@ -146,14 +121,31 @@ private:
void setNodeStateUp();
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
+ /**
+ * Enables a new cluster state. Used by tests to bypass BucketDBUpdater.
+ */
+ void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle);
+
// Accessors used by tests
+ std::string getActiveIdealStateOperations() const;
+ const lib::ClusterStateBundle& getClusterStateBundle() const;
+ const DistributorConfiguration& getConfig() const;
+ bool isInRecoveryMode() const noexcept;
+ PendingMessageTracker& getPendingMessageTracker();
+ const PendingMessageTracker& getPendingMessageTracker() const;
+ DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept;
+ const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept;
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept;
+ const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept;
+ storage::distributor::DistributorStripeComponent& distributor_component() noexcept;
+ std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
+
StripeBucketDBUpdater& bucket_db_updater();
const StripeBucketDBUpdater& bucket_db_updater() const;
IdealStateManager& ideal_state_manager();
const IdealStateManager& ideal_state_manager() const;
ExternalOperationHandler& external_operation_handler();
const ExternalOperationHandler& external_operation_handler() const;
-
BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept;
/**
@@ -177,6 +169,7 @@ private:
DistributorComponentRegister& _comp_reg;
std::shared_ptr<DistributorMetricSet> _metrics;
ChainedMessageSender* _messageSender;
+ const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;