summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-10-31 13:06:05 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-11-02 10:29:22 +0000
commitc87ae9ba1b1f1745191e75edefcaf436af3ab0c6 (patch)
tree091c4a7db0bf6034f14c5cd90de2428029ba0cef /storage
parent6567a48a55f05f2f20c0cc7ae63dc454319c87e1 (diff)
Heuristically compute expected merge memory usage upper bound
The distributor only knows a limited amount of metadata per bucket replica (roughly: checksum, doc count, doc size). It therefore has no way to know if two replicas with different checksums, both with 1000 documents, have 999 or 0 documents in common. We therefore have to assume the worst and estimate the worst case memory usage as being the _sum_ of mutually divergent replica sizes. Estimates are bounded by the expected bucket merge chunk size, as we make the simplifying assumption that memory usage for a particular node is (roughly) limited to this value for any given bucket. One special-cased exception to this is single-document replicas, as one document can not be split across multiple chunks by definition. Here we track the largest single document replica.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp71
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h5
3 files changed, 106 insertions, 10 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 0773958e535..6ed05e14519 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -45,6 +45,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
MergeBucketMetricSet& get_merge_metrics();
+ [[nodiscard]] uint32_t merge_footprint(const std::string& db_state);
};
std::shared_ptr<MergeOperation>
@@ -86,7 +87,7 @@ MergeOperationTest::assert_simple_merge_bucket_command()
{
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
- "reasons to start: ) => 0",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0",
_sender.getLastCommand(true));
}
@@ -295,7 +296,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
- "reasons to start: ) => 0");
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0");
ASSERT_EQ(merge, _sender.getLastCommand(true));
@@ -356,8 +357,8 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) {
std::string merge(
"MergeBucketCommand(BucketId(0x4000000000000001), to time "
- "10000000, cluster state version: 0, nodes: [0, 2, 1 "
- "(source only)], chain: [], reasons to start: ) => 0");
+ "10000000, cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0");
ASSERT_EQ(merge, _sender.getLastCommand(true));
sendReply(op);
@@ -580,14 +581,14 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all
setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [2, 1, 3], chain: [], "
- "reasons to start: ) => 1",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 1",
_sender.getLastCommand(true));
// All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
setup_simple_merge_op({1, 2});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
"cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
- "reasons to start: ) => 2",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 2",
_sender.getLastCommand(true));
_sender.clear();
@@ -600,7 +601,7 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all
setup_simple_merge_op({2, 1});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
"cluster state version: 0, nodes: [2, 1], chain: [], "
- "reasons to start: ) => 1",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 1",
_sender.getLastCommand(true));
}
@@ -644,4 +645,60 @@ TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) {
EXPECT_FALSE(op->ok());
}
+uint32_t MergeOperationTest::merge_footprint(const std::string& db_state) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), db_state);
+ enable_cluster_state("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ _sender.clear();
+ op.start(_sender);
+ assert(!_sender.commands().empty());
+ auto cmd_as_merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]);
+ assert(cmd_as_merge);
+ return cmd_as_merge->estimated_memory_footprint();
+}
+
+TEST_F(MergeOperationTest, memory_footprint_is_computed_from_replica_state) {
+ // Reminder of syntax: "index=checksum/doc count/doc size"
+ // {0,2} in sync, {1} out of sync; footprint is sum across "sync-ness groups"
+ EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=10/100/3000"), 10'000);
+ EXPECT_EQ(merge_footprint("0=10/100/7000,1=20/200/3000,2=10/100/7000"), 10'000);
+ // All replicas mutually out of sync
+ EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=30/100/5000"), 15'000);
+ // One replica empty
+ EXPECT_EQ(merge_footprint("0=20/200/4000,1=20/200/4000,2=1/0/0"), 4'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_is_bounded_by_max_expected_merge_chunk_size) {
+ auto cfg = make_config();
+ cfg->setSplitSize(20'000); // proxy for max merge chunk size
+ configure_stripe(cfg);
+
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/9999"), 19'999);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10000"), 20'000);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10001"), 20'000);
+ EXPECT_EQ(merge_footprint("0=10/100/6000,1=20/200/7000,2=30/100/20000"), 20'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_with_single_doc_replica_can_be_greater_than_max_expected_bucket_size) {
+ auto cfg = make_config();
+ cfg->setSplitSize(20'000);
+ configure_stripe(cfg);
+
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/1/50000"), 50'000);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/1/60000,2=30/1/50000"), 60'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_estimation_saturates_instead_of_overflowing_u32_limits) {
+ auto cfg = make_config();
+ cfg->setSplitSize(1'234'567);
+ configure_stripe(cfg);
+ // Here we massively _undercount_ our estimate, but this is a wildly unrealistic replica state
+ // just for testing correctness of arithmetic ops.
+ // UINT32_MAX/3 * 3 + 1 will overflow to 0 if unchecked. Must be saturated instead.
+ EXPECT_EQ(merge_footprint("0=10/10/1431655765,1=20/10/1431655765,2=30/10/1431655766"), 1'234'567);
+}
+
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 7ce034abfee..e40a79b06df 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -9,6 +9,7 @@
#include <vespa/storageframework/generic/clock/clock.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/stllike/hash_set.h>
#include <array>
#include <vespa/log/bufferedlogger.h>
@@ -121,7 +122,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
}
const lib::ClusterState& clusterState(_bucketSpace->getClusterState());
- std::vector<std::unique_ptr<BucketCopy> > newCopies;
+ std::vector<std::unique_ptr<BucketCopy>> newCopies;
std::vector<MergeMetaData> nodes;
for (uint16_t node : getNodes()) {
@@ -139,6 +140,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_mnodes.emplace_back(node._nodeIndex, node._sourceOnly);
}
+ const auto estimated_memory_footprint = estimate_merge_memory_footprint_ubound(nodes);
+
if (_mnodes.size() > 1) {
auto msg = std::make_shared<api::MergeBucketCommand>(getBucket(), _mnodes,
_manager->operation_context().generate_unique_timestamp(),
@@ -153,6 +156,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
} else {
msg->set_use_unordered_forwarding(true);
}
+ msg->set_estimated_memory_footprint(estimated_memory_footprint);
LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index);
@@ -367,6 +371,40 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const
return true;
}
+uint32_t MergeOperation::estimate_merge_memory_footprint_ubound(const std::vector<MergeMetaData>& nodes) const noexcept {
+ vespalib::hash_set<uint32_t> seen_checksums;
+ uint32_t worst_case_footprint_across_nodes = 0;
+ uint32_t largest_single_doc_contribution = 0;
+ for (const auto& node : nodes) {
+ if (!seen_checksums.contains(node.checksum())) {
+ seen_checksums.insert(node.checksum());
+ const uint32_t replica_size = node._copy->getUsedFileSize();
+ // We don't know the overlap of document sets across replicas, so we have to assume the
+ // worst and treat the replicas as entirely disjoint. In this case, the _sum_ of all disjoint
+ // replica group footprints gives us the upper bound.
+ // Note: saturate-on-overflow check requires all types to be _unsigned_ to work.
+ if (worst_case_footprint_across_nodes + replica_size >= worst_case_footprint_across_nodes) {
+ worst_case_footprint_across_nodes += replica_size;
+ } else {
+ worst_case_footprint_across_nodes = UINT32_MAX;
+ }
+ // Special case for not bounding single massive doc replica to that of the max
+ // configured bucket size.
+ if (node._copy->getDocumentCount() == 1) {
+ largest_single_doc_contribution = std::max(replica_size, largest_single_doc_contribution);
+ }
+ }
+ }
+ // We know that simply adding up replica sizes is likely to massively over-count in the common
+ // case (due to the intersection set between replicas rarely being empty), so we cap it by the
+ // max expected merge chunk size (which is expected to be configured equal to the split limit).
+ // _Except_ if we have single-doc replicas, as these are known not to overlap, and we know that
+ // the worst case must be the max of the chunk size and the biggest single doc size.
+ const uint32_t expected_max_merge_chunk_size = _manager->operation_context().distributor_config().getSplitSize();
+ return std::max(std::min(worst_case_footprint_across_nodes, expected_max_merge_chunk_size),
+ largest_single_doc_contribution);
+}
+
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index ff21e3d1594..8833ae9a1ad 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -63,8 +63,9 @@ private:
void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState,
DistributorStripeMessageSender& sender);
- bool is_global_bucket_merge() const noexcept;
- bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
+ [[nodiscard]] bool is_global_bucket_merge() const noexcept;
+ [[nodiscard]] bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
+ [[nodiscard]] uint32_t estimate_merge_memory_footprint_ubound(const std::vector<MergeMetaData>& nodes) const noexcept;
MergeBucketMetricSet* get_merge_metrics();
};