summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp14
-rw-r--r--storage/src/tests/distributor/distributortest.cpp36
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp35
-rw-r--r--storage/src/tests/distributor/distributortestutil.h13
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp19
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp10
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp2
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp2
8 files changed, 82 insertions, 49 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 97fccf58901..7e8fec3b83a 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -1865,15 +1865,15 @@ TEST_F(BucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribut
TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20));
_sender.clear();
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
complete_recovery_mode();
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
std::string distConfig(getDistConfig6Nodes4Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
// No replies received yet, still no recovery mode.
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
ASSERT_EQ(messageCount(6), _sender.commands().size());
uint32_t numBuckets = 10;
@@ -1884,9 +1884,9 @@ TEST_F(BucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode)
// Pending cluster state (i.e. distribution) has been enabled, which should
// cause recovery mode to be entered.
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
complete_recovery_mode();
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
}
namespace {
@@ -2471,14 +2471,14 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until
"version:2 distributor:1 storage:4", n_buckets, 4));
// Version should not be switched over yet
- EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
+ EXPECT_EQ(uint32_t(1), current_distributor_cluster_state_bundle().getVersion());
EXPECT_EQ(uint64_t(0), mutable_default_db().size());
EXPECT_EQ(uint64_t(0), mutable_global_db().size());
EXPECT_FALSE(activate_cluster_state_version(2));
- EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
+ EXPECT_EQ(uint32_t(2), current_distributor_cluster_state_bundle().getVersion());
EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size());
EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size());
}
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 7958306db5f..2944348e639 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -175,6 +175,10 @@ struct DistributorTest : Test, DistributorTestUtil {
return _distributor->handleMessage(msg);
}
+ uint64_t db_sample_interval_sec() const noexcept {
+ return std::chrono::duration_cast<std::chrono::seconds>(_distributor->db_memory_sample_interval()).count();
+ }
+
void configure_stale_reads_enabled(bool enabled) {
ConfigBuilder builder;
builder.allowStaleReadsDuringClusterStateTransitions = enabled;
@@ -280,19 +284,19 @@ TEST_F(DistributorTest, recovery_mode_on_cluster_state_change) {
"storage:1 .0.s:d distributor:1");
enableDistributorClusterState("storage:1 distributor:1");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
for (uint32_t i = 0; i < 3; ++i) {
addNodesToBucketDB(document::BucketId(16, i), "0=1");
}
for (int i = 0; i < 3; ++i) {
tick();
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
}
tick();
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
enableDistributorClusterState("storage:2 distributor:1");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
}
// TODO -> stripe test
@@ -489,14 +493,6 @@ TEST_F(DistributorTest, metric_update_hook_updates_pending_maintenance_metrics)
}
}
-namespace {
-
-uint64_t db_sample_interval_sec(const Distributor& d) noexcept {
- return std::chrono::duration_cast<std::chrono::seconds>(d.db_memory_sample_interval()).count();
-}
-
-}
-
// TODO -> stripe test
TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_time_intervals) {
getClock().setAbsoluteTimeInSeconds(1000);
@@ -517,7 +513,7 @@ TEST_F(DistributorTest, bucket_db_memory_usage_metrics_only_updated_at_fixed_tim
// interval has passed. Instead, old metric gauge values should be preserved.
addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a,1=2/2/2");
- const auto sample_interval_sec = db_sample_interval_sec(getDistributor());
+ const auto sample_interval_sec = db_sample_interval_sec();
getClock().setAbsoluteTimeInSeconds(1000 + sample_interval_sec - 1); // Not there yet.
tickDistributorNTimes(50);
distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l));
@@ -925,7 +921,7 @@ TEST_F(DistributorTest, merge_busy_inhibit_duration_is_propagated_to_pending_mes
reply->setResult(api::ReturnCode(api::ReturnCode::BUSY));
_distributor->handleReply(std::shared_ptr<api::StorageReply>(std::move(reply)));
- auto& node_info = _distributor->getPendingMessageTracker().getNodeInfo();
+ auto& node_info = pending_message_tracker().getNodeInfo();
EXPECT_TRUE(node_info.isBusy(0));
getClock().addSecondsToTime(99);
@@ -1045,7 +1041,7 @@ TEST_F(DistributorTest, entering_recovery_mode_resets_bucket_space_stats) {
tickDistributorNTimes(5); // 1/3rds into second round through database
enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
// Bucket space stats should now be invalid per space per node, pending stats
// from state version 2. Exposing stats from version 1 risks reporting stale
// information back to the cluster controller.
@@ -1066,13 +1062,13 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep
addNodesToBucketDB(document::BucketId(16, 2), "0=1/1/1/t/a");
enableDistributorClusterState("version:2 distributor:1 storage:3 .1.s:d");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
tickDistributorNTimes(1); // DB round not yet complete
EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
tickDistributorNTimes(2); // DB round complete after 2nd bucket + "scan done" discovery tick
EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
// Now out of recovery mode, subsequent round completions should not send replies
tickDistributorNTimes(10);
EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
@@ -1080,12 +1076,12 @@ TEST_F(DistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_rep
void DistributorTest::do_test_pending_merge_getnodestate_reply_edge(BucketSpace space) {
setupDistributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
- EXPECT_TRUE(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(distributor_is_in_recovery_mode());
// 2 buckets with missing replicas triggering merge pending stats
addNodesToBucketDB(Bucket(space, BucketId(16, 1)), "0=1/1/1/t/a");
addNodesToBucketDB(Bucket(space, BucketId(16, 2)), "0=1/1/1/t/a");
tickDistributorNTimes(3);
- EXPECT_FALSE(_distributor->isInRecoveryMode());
+ EXPECT_FALSE(distributor_is_in_recovery_mode());
const auto space_name = FixedBucketSpaces::to_string(space);
assertBucketSpaceStats(2, 0, 1, space_name, _distributor->getBucketSpacesStats());
// First completed scan sends off merge stats et al to cluster controller
@@ -1214,7 +1210,7 @@ TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_s
TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) {
set_up_and_start_get_op_with_stale_reads_enabled(true);
Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1));
- EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage(
+ EXPECT_FALSE(pending_message_tracker().hasPendingMessage(
0, bucket, api::MessageType::GET_ID));
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index bdd953b6206..5d204693971 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -425,6 +425,36 @@ DistributorTestUtil::getReadOnlyBucketSpaceRepo() const {
return _distributor->getReadOnlyBucketSpaceRepo();
}
+bool
+DistributorTestUtil::distributor_is_in_recovery_mode() const noexcept {
+ return _distributor->isInRecoveryMode();
+}
+
+const lib::ClusterStateBundle&
+DistributorTestUtil::current_distributor_cluster_state_bundle() const noexcept {
+ return getDistributor().getClusterStateBundle();
+}
+
+std::string
+DistributorTestUtil::active_ideal_state_operations() const {
+ return _distributor->getActiveIdealStateOperations();
+}
+
+const PendingMessageTracker&
+DistributorTestUtil::pending_message_tracker() const noexcept {
+ return _distributor->getPendingMessageTracker();
+}
+
+PendingMessageTracker&
+DistributorTestUtil::pending_message_tracker() noexcept {
+ return _distributor->getPendingMessageTracker();
+}
+
+std::chrono::steady_clock::duration
+DistributorTestUtil::db_memory_sample_interval() const noexcept {
+ return _distributor->db_memory_sample_interval();
+}
+
const lib::Distribution&
DistributorTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
@@ -453,4 +483,9 @@ DistributorTestUtil::enable_distributor_cluster_state(const lib::ClusterStateBun
getBucketDBUpdater().simulate_cluster_state_bundle_activation(state);
}
+void
+DistributorTestUtil::setSystemState(const lib::ClusterState& systemState) {
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index b845456e873..0ed2498a0a2 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -118,9 +118,8 @@ public:
storage::distributor::DistributorStripeComponent& distributor_component();
storage::distributor::DistributorStripeOperationContext& operation_context();
- Distributor& getDistributor() {
- return *_distributor;
- }
+ Distributor& getDistributor() noexcept { return *_distributor; }
+ const Distributor& getDistributor() const noexcept { return *_distributor; }
bool tick();
@@ -140,6 +139,12 @@ public:
const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo();
const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const;
+ [[nodiscard]] bool distributor_is_in_recovery_mode() const noexcept;
+ [[nodiscard]] const lib::ClusterStateBundle& current_distributor_cluster_state_bundle() const noexcept;
+ [[nodiscard]] std::string active_ideal_state_operations() const;
+ [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
+ [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
+ [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
const lib::Distribution& getDistribution() const;
// "End to end" distribution change trigger, which will invoke the bucket
@@ -190,6 +195,8 @@ public:
DistributorMessageSenderStub& sender() noexcept { return _sender; }
const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
+
+ void setSystemState(const lib::ClusterState& systemState);
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index ce9aa0a6800..0a36e5cd0e5 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -38,10 +38,6 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
close();
}
- void setSystemState(const lib::ClusterState& systemState) {
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
- }
-
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
const PendingMessageTracker& tracker,
@@ -120,7 +116,7 @@ TEST_F(IdealStateManagerTest, disabled_state_checker) {
ost.str());
tick();
- EXPECT_EQ("", _distributor->getActiveIdealStateOperations());
+ EXPECT_EQ("", active_ideal_state_operations());
}
@@ -143,13 +139,12 @@ TEST_F(IdealStateManagerTest, clear_active_on_node_down) {
EXPECT_EQ("setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n"
"setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n"
"setbucketstate to [2] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
setSystemState(lib::ClusterState("distributor:1 storage:3 .2.s:d"));
- EXPECT_EQ("", _distributor->getActiveIdealStateOperations());
- EXPECT_EQ(0, _distributor->getPendingMessageTracker()
- .getNodeInfo().getPendingCount(0));
+ EXPECT_EQ("", active_ideal_state_operations());
+ EXPECT_EQ(0, pending_message_tracker().getNodeInfo().getPendingCount(0));
}
TEST_F(IdealStateManagerTest, recheck_when_active) {
@@ -162,17 +157,17 @@ TEST_F(IdealStateManagerTest, recheck_when_active) {
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
tick();
EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n",
- _distributor->getActiveIdealStateOperations());
+ active_ideal_state_operations());
}
TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) {
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 02491b670c6..478f20796d0 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -67,7 +67,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
createLinks();
setupDistributor(1, 1, "version:1 distributor:1 storage:1");
_op_owner = std::make_unique<OperationOwner>(_sender, getClock());
- _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker());
+ _sender.setPendingMessageTracker(pending_message_tracker());
addNodesToBucketDB(_sub_bucket, "0=1/2/3/t");
}
@@ -96,7 +96,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
std::shared_ptr<ReadForWriteVisitorOperationStarter> create_rfw_op(std::shared_ptr<VisitorOperation> visitor_op) {
return std::make_shared<ReadForWriteVisitorOperationStarter>(
std::move(visitor_op), operation_sequencer(),
- *_op_owner, getDistributor().getPendingMessageTracker(),
+ *_op_owner, pending_message_tracker(),
_mock_uuid_generator);
}
};
@@ -123,7 +123,7 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pend
std::move(nodes),
api::Timestamp(123456));
merge->setAddress(make_storage_address(0));
- getDistributor().getPendingMessageTracker().insert(merge);
+ pending_message_tracker().insert(merge);
_op_owner->start(op, OperationStarter::Priority(120));
ASSERT_EQ("", _sender.getCommands(true));
EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
@@ -157,13 +157,13 @@ struct ConcurrentMutationFixture {
_mutation = _test.sender().command(0);
// Since pending message tracking normally happens in the distributor itself during sendUp,
// we have to emulate this and explicitly insert the sent message into the pending mapping.
- _test.getDistributor().getPendingMessageTracker().insert(_mutation);
+ _test.pending_message_tracker().insert(_mutation);
}
void unblock_bucket() {
// Pretend update operation completed
auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply());
- _test.getDistributor().getPendingMessageTracker().reply(*update_reply);
+ _test.pending_message_tracker().reply(*update_reply);
_test._op_owner->handleReply(update_reply);
}
};
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 9b49f1347cc..8310e4e38e0 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -48,7 +48,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
};
void enableClusterState(const lib::ClusterState& systemState) {
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
+ setSystemState(systemState);
}
void insertJoinableBuckets();
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index ccbb64e8970..f9a4e6dbe0f 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -835,7 +835,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
TEST_F(VisitorOperationTest, visit_ideal_node) {
ClusterState state("distributor:1 storage:3");
- _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state));
+ enable_distributor_cluster_state(lib::ClusterStateBundle(state));
// Create buckets in bucketdb
for (int i=0; i<32; i++ ) {