summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/mergeoperationtest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp71
1 files changed, 64 insertions, 7 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 0773958e535..6ed05e14519 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -45,6 +45,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
MergeBucketMetricSet& get_merge_metrics();
+ [[nodiscard]] uint32_t merge_footprint(const std::string& db_state);
};
std::shared_ptr<MergeOperation>
@@ -86,7 +87,7 @@ MergeOperationTest::assert_simple_merge_bucket_command()
{
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
- "reasons to start: ) => 0",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0",
_sender.getLastCommand(true));
}
@@ -295,7 +296,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
- "reasons to start: ) => 0");
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0");
ASSERT_EQ(merge, _sender.getLastCommand(true));
@@ -356,8 +357,8 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) {
std::string merge(
"MergeBucketCommand(BucketId(0x4000000000000001), to time "
- "10000000, cluster state version: 0, nodes: [0, 2, 1 "
- "(source only)], chain: [], reasons to start: ) => 0");
+ "10000000, cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0");
ASSERT_EQ(merge, _sender.getLastCommand(true));
sendReply(op);
@@ -580,14 +581,14 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all
setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [2, 1, 3], chain: [], "
- "reasons to start: ) => 1",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 1",
_sender.getLastCommand(true));
// All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
setup_simple_merge_op({1, 2});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
"cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
- "reasons to start: ) => 2",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 2",
_sender.getLastCommand(true));
_sender.clear();
@@ -600,7 +601,7 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all
setup_simple_merge_op({2, 1});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
"cluster state version: 0, nodes: [2, 1], chain: [], "
- "reasons to start: ) => 1",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 1",
_sender.getLastCommand(true));
}
@@ -644,4 +645,60 @@ TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) {
EXPECT_FALSE(op->ok());
}
+uint32_t MergeOperationTest::merge_footprint(const std::string& db_state) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), db_state);
+ enable_cluster_state("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ _sender.clear();
+ op.start(_sender);
+ assert(!_sender.commands().empty());
+ auto cmd_as_merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]);
+ assert(cmd_as_merge);
+ return cmd_as_merge->estimated_memory_footprint();
+}
+
+TEST_F(MergeOperationTest, memory_footprint_is_computed_from_replica_state) {
+ // Reminder of syntax: "index=checksum/doc count/doc size"
+ // {0,2} in sync, {1} out of sync; footprint is sum across "sync-ness groups"
+ EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=10/100/3000"), 10'000);
+ EXPECT_EQ(merge_footprint("0=10/100/7000,1=20/200/3000,2=10/100/7000"), 10'000);
+ // All replicas mutually out of sync
+ EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=30/100/5000"), 15'000);
+ // One replica empty
+ EXPECT_EQ(merge_footprint("0=20/200/4000,1=20/200/4000,2=1/0/0"), 4'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_is_bounded_by_max_expected_merge_chunk_size) {
+ auto cfg = make_config();
+ cfg->setSplitSize(20'000); // proxy for max merge chunk size
+ configure_stripe(cfg);
+
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/9999"), 19'999);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10000"), 20'000);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10001"), 20'000);
+ EXPECT_EQ(merge_footprint("0=10/100/6000,1=20/200/7000,2=30/100/20000"), 20'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_with_single_doc_replica_can_be_greater_than_max_expected_bucket_size) {
+ auto cfg = make_config();
+ cfg->setSplitSize(20'000);
+ configure_stripe(cfg);
+
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/1/50000"), 50'000);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/1/60000,2=30/1/50000"), 60'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_estimation_saturates_instead_of_overflowing_u32_limits) {
+ auto cfg = make_config();
+ cfg->setSplitSize(1'234'567);
+ configure_stripe(cfg);
+ // Here we massively _undercount_ our estimate, but this is a wildly unrealistic replica state
+ // just for testing correctness of arithmetic ops.
+ // UINT32_MAX/3 * 3 + 1 will overflow to 0 if unchecked. Must be saturated instead.
+ EXPECT_EQ(merge_footprint("0=10/10/1431655765,1=20/10/1431655765,2=30/10/1431655766"), 1'234'567);
+}
+
} // storage::distributor