diff options
author | Tor Brede Vekterli <vekterli@vespa.ai> | 2023-10-31 13:06:05 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@vespa.ai> | 2023-11-02 10:29:22 +0000 |
commit | c87ae9ba1b1f1745191e75edefcaf436af3ab0c6 (patch) | |
tree | 091c4a7db0bf6034f14c5cd90de2428029ba0cef /storage/src | |
parent | 6567a48a55f05f2f20c0cc7ae63dc454319c87e1 (diff) |
Heuristically compute expected merge memory usage upper bound
The distributor only knows a limited amount of metadata per
bucket replica (roughly: checksum, doc count, doc size). It
therefore has no way to know if two replicas with different
checksums, both with 1000 documents, have 999 or 0 documents
in common. We therefore have to assume the worst and estimate
the worst case memory usage as being the _sum_ of mutually
divergent replica sizes.
Estimates are bounded by the expected bucket merge chunk size,
as we make the simplifying assumption that memory usage for
a particular node is (roughly) limited to this value for any
given bucket.
One special-cased exception to this is single-document replicas,
as one document can not be split across multiple chunks by
definition. Here we track the largest single document replica.
Diffstat (limited to 'storage/src')
3 files changed, 106 insertions, 10 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 0773958e535..6ed05e14519 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -45,6 +45,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil { void assert_simple_merge_bucket_command(); void assert_simple_delete_bucket_command(); MergeBucketMetricSet& get_merge_metrics(); + [[nodiscard]] uint32_t merge_footprint(const std::string& db_state); }; std::shared_ptr<MergeOperation> @@ -86,7 +87,7 @@ MergeOperationTest::assert_simple_merge_bucket_command() { ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " - "reasons to start: ) => 0", + "estimated memory footprint: 2 bytes, reasons to start: ) => 0", _sender.getLastCommand(true)); } @@ -295,7 +296,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " - "reasons to start: ) => 0"); + "estimated memory footprint: 2 bytes, reasons to start: ) => 0"); ASSERT_EQ(merge, _sender.getLastCommand(true)); @@ -356,8 +357,8 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) { std::string merge( "MergeBucketCommand(BucketId(0x4000000000000001), to time " - "10000000, cluster state version: 0, nodes: [0, 2, 1 " - "(source only)], chain: [], reasons to start: ) => 0"); + "10000000, cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " + "estimated memory footprint: 2 bytes, reasons to start: ) => 0"); ASSERT_EQ(merge, _sender.getLastCommand(true)); sendReply(op); @@ -580,14 +581,14 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [2, 1, 3], chain: [], " - "reasons to start: ) => 1", + "estimated memory footprint: 2 bytes, reasons to start: ) => 1", _sender.getLastCommand(true)); // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2) setup_simple_merge_op({1, 2}); ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, " "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), " - "reasons to start: ) => 2", + "estimated memory footprint: 2 bytes, reasons to start: ) => 2", _sender.getLastCommand(true)); _sender.clear(); @@ -600,7 +601,7 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all setup_simple_merge_op({2, 1}); ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, " "cluster state version: 0, nodes: [2, 1], chain: [], " - "reasons to start: ) => 1", + "estimated memory footprint: 2 bytes, reasons to start: ) => 1", _sender.getLastCommand(true)); } @@ -644,4 +645,60 @@ TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) { EXPECT_FALSE(op->ok()); } +uint32_t MergeOperationTest::merge_footprint(const std::string& db_state) { + getClock().setAbsoluteTimeInSeconds(10); + addNodesToBucketDB(document::BucketId(16, 1), db_state); + enable_cluster_state("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + _sender.clear(); + op.start(_sender); + assert(!_sender.commands().empty()); + auto cmd_as_merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]); + assert(cmd_as_merge); + return cmd_as_merge->estimated_memory_footprint(); +} + +TEST_F(MergeOperationTest, memory_footprint_is_computed_from_replica_state) { + // Reminder of syntax: "index=checksum/doc count/doc size" + // {0,2} in sync, {1} out of sync; footprint is sum across "sync-ness groups" + EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=10/100/3000"), 10'000); + EXPECT_EQ(merge_footprint("0=10/100/7000,1=20/200/3000,2=10/100/7000"), 10'000); + // All replicas mutually out of sync + EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=30/100/5000"), 15'000); + // One replica empty + EXPECT_EQ(merge_footprint("0=20/200/4000,1=20/200/4000,2=1/0/0"), 4'000); +} + +TEST_F(MergeOperationTest, memory_footprint_is_bounded_by_max_expected_merge_chunk_size) { + auto cfg = make_config(); + cfg->setSplitSize(20'000); // proxy for max merge chunk size + configure_stripe(cfg); + + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/9999"), 19'999); + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10000"), 20'000); + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10001"), 20'000); + EXPECT_EQ(merge_footprint("0=10/100/6000,1=20/200/7000,2=30/100/20000"), 20'000); +} + +TEST_F(MergeOperationTest, memory_footprint_with_single_doc_replica_can_be_greater_than_max_expected_bucket_size) { + auto cfg = make_config(); + cfg->setSplitSize(20'000); + configure_stripe(cfg); + + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/1/50000"), 50'000); + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/1/60000,2=30/1/50000"), 60'000); +} + +TEST_F(MergeOperationTest, memory_footprint_estimation_saturates_instead_of_overflowing_u32_limits) { + auto cfg = make_config(); + cfg->setSplitSize(1'234'567); + configure_stripe(cfg); + // Here we massively _undercount_ our estimate, but this is a wildly unrealistic replica state + // just for testing correctness of arithmetic ops. + // UINT32_MAX/3 * 3 + 1 will overflow to 0 if unchecked. Must be saturated instead. + EXPECT_EQ(merge_footprint("0=10/10/1431655765,1=20/10/1431655765,2=30/10/1431655766"), 1'234'567); +} + } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 7ce034abfee..e40a79b06df 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -9,6 +9,7 @@ #include <vespa/storageframework/generic/clock/clock.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/hash_set.h> #include <array> #include <vespa/log/bufferedlogger.h> @@ -121,7 +122,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) } const lib::ClusterState& clusterState(_bucketSpace->getClusterState()); - std::vector<std::unique_ptr<BucketCopy> > newCopies; + std::vector<std::unique_ptr<BucketCopy>> newCopies; std::vector<MergeMetaData> nodes; for (uint16_t node : getNodes()) { @@ -139,6 +140,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) _mnodes.emplace_back(node._nodeIndex, node._sourceOnly); } + const auto estimated_memory_footprint = estimate_merge_memory_footprint_ubound(nodes); + if (_mnodes.size() > 1) { auto msg = std::make_shared<api::MergeBucketCommand>(getBucket(), _mnodes, _manager->operation_context().generate_unique_timestamp(), @@ -153,6 +156,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) } else { msg->set_use_unordered_forwarding(true); } + msg->set_estimated_memory_footprint(estimated_memory_footprint); LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index); @@ -367,6 +371,40 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const return true; } +uint32_t MergeOperation::estimate_merge_memory_footprint_ubound(const std::vector<MergeMetaData>& nodes) const noexcept { + vespalib::hash_set<uint32_t> seen_checksums; + uint32_t worst_case_footprint_across_nodes = 0; + uint32_t largest_single_doc_contribution = 0; + for (const auto& node : nodes) { + if (!seen_checksums.contains(node.checksum())) { + seen_checksums.insert(node.checksum()); + const uint32_t replica_size = node._copy->getUsedFileSize(); + // We don't know the overlap of document sets across replicas, so we have to assume the + // worst and treat the replicas as entirely disjoint. In this case, the _sum_ of all disjoint + // replica group footprints gives us the upper bound. + // Note: saturate-on-overflow check requires all types to be _unsigned_ to work. + if (worst_case_footprint_across_nodes + replica_size >= worst_case_footprint_across_nodes) { + worst_case_footprint_across_nodes += replica_size; + } else { + worst_case_footprint_across_nodes = UINT32_MAX; + } + // Special case for not bounding single massive doc replica to that of the max + // configured bucket size. + if (node._copy->getDocumentCount() == 1) { + largest_single_doc_contribution = std::max(replica_size, largest_single_doc_contribution); + } + } + } + // We know that simply adding up replica sizes is likely to massively over-count in the common + // case (due to the intersection set between replicas rarely being empty), so we cap it by the + // max expected merge chunk size (which is expected to be configured equal to the split limit). + // _Except_ if we have single-doc replicas, as these are known not to overlap, and we know that + // the worst case must be the max of the chunk size and the biggest single doc size. + const uint32_t expected_max_merge_chunk_size = _manager->operation_context().distributor_config().getSplitSize(); + return std::max(std::min(worst_case_footprint_across_nodes, expected_max_merge_chunk_size), + largest_single_doc_contribution); +} + MergeBucketMetricSet* MergeOperation::get_merge_metrics() { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index ff21e3d1594..8833ae9a1ad 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -63,8 +63,9 @@ private: void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState, DistributorStripeMessageSender& sender); - bool is_global_bucket_merge() const noexcept; - bool all_involved_nodes_support_unordered_merge_chaining() const noexcept; + [[nodiscard]] bool is_global_bucket_merge() const noexcept; + [[nodiscard]] bool all_involved_nodes_support_unordered_merge_chaining() const noexcept; + [[nodiscard]] uint32_t estimate_merge_memory_footprint_ubound(const std::vector<MergeMetaData>& nodes) const noexcept; MergeBucketMetricSet* get_merge_metrics(); }; |