aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-05-03 11:09:02 +0200
committerGitHub <noreply@github.com>2021-05-03 11:09:02 +0200
commit30a495955ab74ccd37533f50f2b67d97c148d48f (patch)
tree4e8fa3964be427fc358d3b3ecf185ec2fb9b76cc
parent8fb20fde4661ab73946c338e3b4b3209d2e79b73 (diff)
Revert "Make more Distributor internals only available to friended tests"
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp14
-rw-r--r--storage/src/tests/distributor/distributortest.cpp36
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp35
-rw-r--r--storage/src/tests/distributor/distributortestutil.h13
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp19
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp10
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp2
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp48
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h45
10 files changed, 86 insertions, 138 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 7e8fec3b83a..97fccf58901 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut
TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20));
_sender.clear();
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
complete_recovery_mode();
- EXPECT_FALSE(distributor_is_in_recovery_mode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
std::string distConfig(getDistConfig6Nodes4Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
// No replies received yet, still no recovery mode.
- EXPECT_FALSE(distributor_is_in_recovery_mode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
ASSERT_EQ(messageCount(6), _sender.commands().size());
uint32_t numBuckets = 10;
@@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode)
// Pending cluster state (i.e. distribution) has been enabled, which should
// cause recovery mode to be entered.
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
complete_recovery_mode();
- EXPECT_FALSE(distributor_is_in_recovery_mode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
}
namespace {
@@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until
"version:2 distributor:1 storage:4", n_buckets, 4));
// Version should not be switched over yet
- EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion());
+ EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
EXPECT_EQ(uint64_t(0), mutable_default_db().size());
EXPECT_EQ(uint64_t(0), mutable_global_db().size());
EXPECT_FALSE(activate_cluster_state_version(2));
- EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion());
+ EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size());
EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size());
}
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 2944348e639..7958306db5f 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -175,10 +175,6 @@ struct DistributorTest : Test, DistributorTestUtil {
return _distributor->handleMessage(msg);
}
- uint64_t db_sample_interval_sec() const noexcept {
- return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count();
- }
-
void configure_stale_reads_enabled(bool enabled) {
ConfigBuilder builder;
builder.allowStaleReadsDuringClusterStateTransitions = enabled;
@@ -284,19 +280,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
"storage:1 .0.s:d distributor:1");
enableDistributorClusterState("storage:1 distributor:1");
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
for (uint32_t i = 0; i < 3; ++i) {
addNodesToBucketDB(document::BucketId(16, i), "0=1");
}
for (int i = 0; i < 3; ++i) {
tick();
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
}
tick();
- EXPECT_FALSE(distributor_is_in_recovery_mode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
enableDistributorClusterState("storage:2 distributor:1");
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
}
// TODO -> stripe test
@@ -493,6 +489,14 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics)
}
}
+namespace {
+
+uint64_t db_sample_interval_sec(const Distributor& d) noexcept {
+ return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count();
+}
+
+}
+
// TODO -> stripe test
TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) {
getClock().setAbsoluteTimeInSeconds(1000);
@@ -513,7 +517,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim
// interval has passed. Instead, old metric gauge values should be preserved.
addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2");
- const auto sample_interval_sec = db_sample_interval_sec();
+ const auto sample_interval_sec = db_sample_interval_sec(getDistributor());
getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet.
tickDistributorNTimes(50);
distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l));
@@ -921,7 +925,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes
reply->setResult(api::ReturnCode(api::ReturnCode::BUSY));
_distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply)));
- auto& node_info = pending_message_tracker().getNodeInfo();
+ auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo();
EXPECT_TRUE(node_info.isBusy(0));
getClock().addSecondsToTime(99);
@@ -1041,7 +1045,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
tickDistributorNTimes(5); // 1/3rds into second round through database
enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d");
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
// Bucket space stats should now be invalid per space per node, pending stats
// from state version 2. Exposing stats from version 1 risks reporting stale
// information back to the cluster controller.
@@ -1062,13 +1066,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep
addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a");
enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d");
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
tickDistributorNTimes(1); // DB round not yet complete
EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick
EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
- EXPECT_FALSE(distributor_is_in_recovery_mode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
// Now out of recovery mode, subsequent round completions should not send replies
tickDistributorNTimes(10);
EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
@@ -1076,12 +1080,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep
void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) {
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
- EXPECT_TRUE(distributor_is_in_recovery_mode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
// 2 buckets with missing replicas triggering merge pending stats
addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a");
tickDistributorNTimes(3);
- EXPECT_FALSE(distributor_is_in_recovery_mode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
const auto space_name = FixedBucketSpaces::to_string(space);
assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats());
// First completed scan sends off merge stats et al to cluster controller
@@ -1210,7 +1214,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s
TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) {
set_up_and_start_get_op_with_stale_reads_enabled(true);
Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1));
- EXPECT_FALSE(pending_message_tracker().hasPendingMessage(
+ EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage(
0, bucket, api::MessageType::GET_ID));
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 5d204693971..bdd953b6206 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -425,36 +425,6 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const {
return _distributor->getReadOnlyBucketSpaceRepo();
}
-bool
-DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept {
- return _distributor->isInRecoveryMode();
-}
-
-const lib::ClusterStateBundle&
-DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept {
- return getDistributor().getClusterStateBundle();
-}
-
-std::string
-DistributorTestUtil::active_ideal_state_operations() const {
- return _distributor->getActiveIdealStateOperations();
-}
-
-const PendingMessageTracker&
-DistributorTestUtil::pending_message_tracker() const noexcept {
- return _distributor->getPendingMessageTracker();
-}
-
-PendingMessageTracker&
-DistributorTestUtil::pending_message_tracker() noexcept {
- return _distributor->getPendingMessageTracker();
-}
-
-std::chrono::steady_clock::duration
-DistributorTestUtil::db_memory_sample_interval() const noexcept {
- return _distributor->db_memory_sample_interval();
-}
-
const lib::Distribution&
DistributorTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
@@ -483,9 +453,4 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun
getBucketDBUpdater().simulate_cluster_state_bundle_activation(state);
}
-void
-DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) {
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
-}
-
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 0ed2498a0a2..b845456e873 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -118,8 +118,9 @@ public:
storage::distributor::DistributorStripeComponent& distributor_component();
storage::distributor::DistributorStripeOperationContext& operation_context();
- Distributor& getDistributor() noexcept { return *_distributor; }
- const Distributor& getDistributor() const noexcept { return *_distributor; }
+ Distributor& getDistributor() {
+ return *_distributor;
+ }
bool tick();
@@ -139,12 +140,6 @@ public:
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo();
const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const;
- [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept;
- [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept;
- [[nodiscard]] std::string active_ideal_state_operations() const;
- [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
- [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
- [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
const lib::Distribution& getDistribution() const;
// "End to end" distribution change trigger, which will invoke the bucket
@@ -195,8 +190,6 @@ public:
DistributorMessageSenderStub& sender() noexcept { return _sender; }
const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
-
- void setSystemState(const lib::ClusterState& systemState);
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 0a36e5cd0e5..ce9aa0a6800 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -38,6 +38,10 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
close();
}
+ void setSystemState(const lib::ClusterState& systemState) {
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
+ }
+
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
const PendingMessageTracker& tracker,
@@ -116,7 +120,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) {
ost.str());
tick();
- EXPECT_EQ("", active_ideal_state_operations());
+ EXPECT_EQ("", _distributor->getActiveIdealStateOperations());
}
@@ -139,12 +143,13 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) {
EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n"
"setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n"
"setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n",
- active_ideal_state_operations());
+ _distributor->getActiveIdealStateOperations());
setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d"));
- EXPECT_EQ("", active_ideal_state_operations());
- EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0));
+ EXPECT_EQ("", _distributor->getActiveIdealStateOperations());
+ EXPECT_EQ(0, _distributor->getPendingMessageTracker()
+ .getNodeInfo().getPendingCount(0));
}
TEST_F(IdealStateManagerTest, recheck_when_active) {
@@ -157,17 +162,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) {
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- active_ideal_state_operations());
+ _distributor->getActiveIdealStateOperations());
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- active_ideal_state_operations());
+ _distributor->getActiveIdealStateOperations());
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- active_ideal_state_operations());
+ _distributor->getActiveIdealStateOperations());
}
TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) {
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 478f20796d0..02491b670c6 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
createLinks();
setupDistributor(1, 1, "version:1 distributor:1 storage:1");
_op_owner = std::make_unique<OperationOwner>(_sender, getClock());
- _sender.setPendingMessageTracker(pending_message_tracker());
+ _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker());
addNodesToBucketDB(_sub_bucket, "0=1/2/3/t");
}
@@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) {
return std::make_shared<ReadForWriteVisitorOperationStarter>(
std::move(visitor_op), operation_sequencer(),
- *_op_owner, pending_message_tracker(),
+ *_op_owner, getDistributor().getPendingMessageTracker(),
_mock_uuid_generator);
}
};
@@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend
std::move(nodes),
api::Timestamp(123456));
merge->setAddress(make_storage_address(0));
- pending_message_tracker().insert(merge);
+ getDistributor().getPendingMessageTracker().insert(merge);
_op_owner->start(op, OperationStarter::Priority(120));
ASSERT_EQ("", _sender.getCommands(true));
EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
@@ -157,13 +157,13 @@ struct ConcurrentMutationFixture {
_mutation = _test.sender().command(0);
// Since pending message tracking normally happens in the distributor itself during sendUp,
// we have to emulate this and explicitly insert the sent message into the pending mapping.
- _test.pending_message_tracker().insert(_mutation);
+ _test.getDistributor().getPendingMessageTracker().insert(_mutation);
}
void unblock_bucket() {
// Pretend update operation completed
auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply());
- _test.pending_message_tracker().reply(*update_reply);
+ _test.getDistributor().getPendingMessageTracker().reply(*update_reply);
_test._op_owner->handleReply(update_reply);
}
};
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 8310e4e38e0..9b49f1347cc 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
};
void enableClusterState(const lib::ClusterState& systemState) {
- setSystemState(systemState);
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
}
void insertJoinableBuckets();
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index f9a4e6dbe0f..ccbb64e8970 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
TEST_F(VisitorOperationTest, visit_ideal_node) {
ClusterState state("distributor:1 storage:3");
- enable_distributor_cluster_state(lib::ClusterStateBundle(state));
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state));
// Create buckets in bucketdb
for (int i=0; i<32; i++ ) {
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 04d8560298a..bdf3f30fdc9 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -54,9 +54,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_comp_reg(compReg),
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
- _use_legacy_mode(num_distributor_stripes == 0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
- doneInitHandler, *this, _use_legacy_mode)),
+ doneInitHandler, *this, (num_distributor_stripes == 0))),
_stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)),
_component(*this, compReg, "distributor"),
_total_config(_component.total_distributor_config_sp()),
@@ -72,7 +71,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
{
_component.registerMetric(*_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
- if (!_use_legacy_mode) {
+ if (num_distributor_stripes > 0) {
LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
@@ -91,54 +90,43 @@ Distributor::~Distributor()
closeNextLink();
}
-// TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists.
-// All functions below that assert on _use_legacy_mode are only currently used by tests
-
bool
Distributor::isInRecoveryMode() const noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->isInRecoveryMode();
}
const PendingMessageTracker&
Distributor::getPendingMessageTracker() const {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getPendingMessageTracker();
}
PendingMessageTracker&
Distributor::getPendingMessageTracker() {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getPendingMessageTracker();
}
DistributorBucketSpaceRepo&
Distributor::getBucketSpaceRepo() noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getBucketSpaceRepo();
}
const DistributorBucketSpaceRepo&
Distributor::getBucketSpaceRepo() const noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getBucketSpaceRepo();
}
DistributorBucketSpaceRepo&
Distributor::getReadOnlyBucketSpaceRepo() noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getReadOnlyBucketSpaceRepo();
}
const DistributorBucketSpaceRepo&
Distributor::getReadyOnlyBucketSpaceRepo() const noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getReadOnlyBucketSpaceRepo();;
}
storage::distributor::DistributorStripeComponent&
Distributor::distributor_component() noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
// TODO STRIPE We need to grab the stripe's component since tests like to access
// these things uncomfortably directly.
return _stripe->_component;
@@ -146,55 +134,46 @@ Distributor::distributor_component() noexcept {
StripeBucketDBUpdater&
Distributor::bucket_db_updater() {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->bucket_db_updater();
}
const StripeBucketDBUpdater&
Distributor::bucket_db_updater() const {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->bucket_db_updater();
}
IdealStateManager&
Distributor::ideal_state_manager() {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->ideal_state_manager();
}
const IdealStateManager&
Distributor::ideal_state_manager() const {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->ideal_state_manager();
}
ExternalOperationHandler&
Distributor::external_operation_handler() {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->external_operation_handler();
}
const ExternalOperationHandler&
Distributor::external_operation_handler() const {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->external_operation_handler();
}
BucketDBMetricUpdater&
Distributor::bucket_db_metric_updater() const noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->_bucketDBMetricUpdater;
}
const DistributorConfiguration&
Distributor::getConfig() const {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->getConfig();
}
std::chrono::steady_clock::duration
Distributor::db_memory_sample_interval() const noexcept {
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->db_memory_sample_interval();
}
@@ -275,7 +254,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
// FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
// regardless of what RPC thread (comm mgr, FRT...) this is called from!
- if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
+ if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*msg)) {
return msg->callHandler(*_bucket_db_updater, msg);
}
// TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
@@ -286,7 +265,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
bool
Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
- if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
+ if (_bucket_db_updater && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
return reply->callHandler(*_bucket_db_updater, reply);
}
return _stripe->handleReply(reply);
@@ -296,7 +275,6 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
bool
Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
{
- assert(_use_legacy_mode); // TODO STRIPE
return _stripe->handleMessage(msg);
}
@@ -321,7 +299,6 @@ Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply)
const lib::ClusterStateBundle&
Distributor::getClusterStateBundle() const
{
- assert(_use_legacy_mode); // TODO STRIPE
// TODO STRIPE must offer a single unifying state across stripes
return _stripe->getClusterStateBundle();
}
@@ -329,7 +306,6 @@ Distributor::getClusterStateBundle() const
void
Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
{
- assert(_use_legacy_mode); // TODO STRIPE
// TODO STRIPE make test injection/force-function
_stripe->enableClusterStateBundle(state);
}
@@ -337,7 +313,7 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
void
Distributor::storageDistributionChanged()
{
- if (!_use_legacy_mode) {
+ if (_bucket_db_updater) {
if (!_distribution || (*_component.getDistribution() != *_distribution)) {
LOG(debug, "Distribution changed to %s, must re-fetch bucket information",
_component.getDistribution()->toString().c_str());
@@ -356,7 +332,7 @@ Distributor::storageDistributionChanged()
void
Distributor::enableNextDistribution()
{
- if (!_use_legacy_mode) {
+ if (_bucket_db_updater) {
if (_next_distribution) {
_distribution = _next_distribution;
_next_distribution = std::shared_ptr<lib::Distribution>();
@@ -374,7 +350,7 @@ void
Distributor::propagateDefaultDistribution(
std::shared_ptr<const lib::Distribution> distribution)
{
- // TODO STRIPE cannot directly access stripe when not in legacy mode!
+ // TODO STRIPE top-level bucket DB updater
_stripe->propagateDefaultDistribution(std::move(distribution));
}
@@ -407,7 +383,6 @@ Distributor::propagateInternalScanMetricsToExternal()
void
Distributor::scanAllBuckets()
{
- assert(_use_legacy_mode); // TODO STRIPE
_stripe->scanAllBuckets();
}
@@ -415,12 +390,11 @@ framework::ThreadWaitInfo
Distributor::doCriticalTick(framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
- if (!_use_legacy_mode) {
+ if (_bucket_db_updater) {
enableNextDistribution();
}
// Propagates any new configs down to stripe(s)
enableNextConfig();
- // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise
_stripe->doCriticalTick(idx);
_tickResult.merge(_stripe->_tickResult);
return _tickResult;
@@ -429,7 +403,7 @@ Distributor::doCriticalTick(framework::ThreadIndex idx)
framework::ThreadWaitInfo
Distributor::doNonCriticalTick(framework::ThreadIndex idx)
{
- if (!_use_legacy_mode) {
+ if (_bucket_db_updater) {
_bucket_db_updater->resend_delayed_messages();
}
// TODO STRIPE stripes need their own thread loops!
@@ -443,7 +417,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
{
// Only lazily trigger a config propagation and internal update if something has _actually changed_.
if (_component.internal_config_generation() != _current_internal_config_generation) {
- if (!_use_legacy_mode) {
+ if (_bucket_db_updater) {
_total_config = _component.total_distributor_config_sp();
auto guard = _stripe_accessor->rendezvous_and_hold_all();
guard->update_total_distributor_config(_component.total_distributor_config_sp());
@@ -453,7 +427,7 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
_current_internal_config_generation = _component.internal_config_generation();
}
- if (_use_legacy_mode) {
+ if (!_bucket_db_updater) {
// TODO STRIPE remove these once tests are fixed to trigger reconfig properly
_hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting());
_stripe->enableNextConfig(); // TODO STRIPE avoid redundant call
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 0420f1b1f22..71d71290bb5 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -83,6 +83,12 @@ public:
int getDistributorIndex() const override { return _component.node_index(); }
const ClusterContext& cluster_context() const override { return _component.cluster_context(); }
+ /**
+ * Enables a new cluster state. Called after the bucket db updater has
+ * retrieved all bucket info related to the change.
+ */
+ void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle);
+
void storageDistributionChanged() override;
bool handleReply(const std::shared_ptr<api::StorageReply>& reply);
@@ -93,9 +99,26 @@ public:
bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
+ std::string getActiveIdealStateOperations() const;
+
virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
+ const lib::ClusterStateBundle& getClusterStateBundle() const;
+ const DistributorConfiguration& getConfig() const;
+
+ bool isInRecoveryMode() const noexcept;
+
+ PendingMessageTracker& getPendingMessageTracker();
+ const PendingMessageTracker& getPendingMessageTracker() const;
+
+ DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept;
+ const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept;
+ DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept;
+ const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept;
+
+ storage::distributor::DistributorStripeComponent& distributor_component() noexcept;
+
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
@@ -112,6 +135,8 @@ public:
Distributor& _self;
};
+ std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
+
private:
friend struct DistributorTest;
friend class BucketDBUpdaterTest;
@@ -121,31 +146,14 @@ private:
void setNodeStateUp();
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
- /**
- * Enables a new cluster state. Used by tests to bypass BucketDBUpdater.
- */
- void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle);
-
// Accessors used by tests
- std::string getActiveIdealStateOperations() const;
- const lib::ClusterStateBundle& getClusterStateBundle() const;
- const DistributorConfiguration& getConfig() const;
- bool isInRecoveryMode() const noexcept;
- PendingMessageTracker& getPendingMessageTracker();
- const PendingMessageTracker& getPendingMessageTracker() const;
- DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept;
- const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept;
- DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept;
- const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept;
- storage::distributor::DistributorStripeComponent& distributor_component() noexcept;
- std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
-
StripeBucketDBUpdater& bucket_db_updater();
const StripeBucketDBUpdater& bucket_db_updater() const;
IdealStateManager& ideal_state_manager();
const IdealStateManager& ideal_state_manager() const;
ExternalOperationHandler& external_operation_handler();
const ExternalOperationHandler& external_operation_handler() const;
+
BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept;
/**
@@ -169,7 +177,6 @@ private:
DistributorComponentRegister& _comp_reg;
std::shared_ptr<DistributorMetricSet> _metrics;
ChainedMessageSender* _messageSender;
- const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
std::unique_ptr<DistributorStripe> _stripe;
std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor;