diff options
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 160 |
1 files changed, 138 insertions, 22 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 7a7f2551c2d..48e0ab186f2 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,17 +1,18 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/test/make_document_bucket.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storage/persistence/messages.h> +#include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/size_literals.h> #include <unordered_set> #include <memory> #include <iterator> @@ -34,18 +35,22 @@ using StorServerConfig = vespa::config::content::core::StorServerConfig; vespalib::string _storage("storage"); struct MergeBuilder { - document::BucketId _bucket; - api::Timestamp _maxTimestamp; - std::vector<uint16_t> _nodes; - std::vector<uint16_t> _chain; + document::BucketId _bucket; + api::Timestamp _maxTimestamp; + std::vector<uint16_t> _nodes; + std::vector<uint16_t> _chain; std::unordered_set<uint16_t> _source_only; - uint64_t _clusterStateVersion; + uint64_t _clusterStateVersion; + uint32_t _memory_usage; + bool _unordered; - MergeBuilder(const document::BucketId& bucket) + explicit MergeBuilder(const document::BucketId& bucket) : _bucket(bucket), _maxTimestamp(1234), _chain(), - _clusterStateVersion(1) + _clusterStateVersion(1), + _memory_usage(0), + _unordered(false) { nodes(0, 1, 2); } @@ -100,6 +105,14 @@ struct MergeBuilder { _source_only.insert(node); return *this; } + MergeBuilder& memory_usage(uint32_t usage_bytes) { + _memory_usage = usage_bytes; + return *this; + } + MergeBuilder& unordered(bool is_unordered) { + _unordered = is_unordered; + return *this; + } api::MergeBucketCommand::SP create() const { std::vector<api::MergeBucketCommand::Node> n; @@ -112,6 +125,8 @@ struct MergeBuilder { makeDocumentBucket(_bucket), n, _maxTimestamp, _clusterStateVersion, _chain); cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0])); + cmd->set_estimated_memory_footprint(_memory_usage); + cmd->set_use_unordered_forwarding(_unordered); return cmd; } }; @@ -137,20 +152,21 @@ struct MergeThrottlerTest : Test { std::vector<DummyStorageLink*> _bottomLinks; MergeThrottlerTest(); - ~MergeThrottlerTest(); + ~MergeThrottlerTest() override; void SetUp() override; void TearDown() override; api::MergeBucketCommand::SP sendMerge(const MergeBuilder&); - void sendAndExpectReply( + void send_and_expect_reply( const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode); + std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg); + void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void fill_up_throttler_active_window_and_queue(uint16_t node_idx); void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { @@ -180,10 +196,10 @@ MergeThrottlerTest::SetUp() std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister()); + auto* throttler = new MergeThrottler(*config, server->getComponentRegister()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); - DummyStorageLink* bottom = new DummyStorageLink; + auto* bottom = new DummyStorageLink; throttler->push_back(std::unique_ptr<StorageLink>(bottom)); _servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release())); @@ -291,6 +307,7 @@ TEST_F(MergeThrottlerTest, chain) { cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); const uint16_t distributorIndex = 123; cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded + cmd->set_estimated_memory_footprint(456'789); StorageMessage::SP fwd = cmd; StorageMessage::SP fwdToExec; @@ -322,10 +339,12 @@ TEST_F(MergeThrottlerTest, chain) { } EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end())); } - // Ensure priority, cluster state version and timeout is correctly forwarded + // Ensure operation properties are forwarded as expected EXPECT_EQ(7, static_cast<int>(fwd->getPriority())); - EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); - EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd); + EXPECT_EQ(as_merge.getClusterStateVersion(), 123); + EXPECT_EQ(as_merge.getTimeout(), 54321ms); + EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789); } _topLinks[lastNodeIdx]->sendDown(fwd); @@ -1350,7 +1369,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) { } void -MergeThrottlerTest::sendAndExpectReply( +MergeThrottlerTest::send_and_expect_reply( const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode) @@ -1362,13 +1381,22 @@ MergeThrottlerTest::sendAndExpectReply( ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult()); } +std::shared_ptr<api::StorageMessage> +MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg) +{ + _topLinks[0]->sendDown(msg); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); +} + TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) { document::BucketId bucket(16, 1234); std::vector<api::GetBucketDiffCommand::Node> nodes; auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>( makeDocumentBucket(bucket), nodes, api::Timestamp(1234)); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + getDiffCmd, api::MessageType::GETBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); ASSERT_EQ(0, _bottomLinks[0]->getNumCommands()); @@ -1379,7 +1407,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject std::vector<api::GetBucketDiffCommand::Node> nodes; auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + applyDiffCmd, api::MessageType::APPLYBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); ASSERT_EQ(0, _bottomLinks[0]->getNumCommands()); @@ -1411,7 +1440,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges) auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>( makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123)); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + getDiffCmd, api::MessageType::GETBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); } @@ -1428,7 +1458,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(), + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + MergeBuilder(bucket).create(), api::MessageType::MERGEBUCKET_REPLY, api::ReturnCode::BUSY)); @@ -1480,6 +1511,91 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) { EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) { + ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself + + _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi); + + ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0); + + std::shared_ptr<api::StorageMessage> fwd_cmd; + ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + + // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered + // merge and can therefore NOT be enqueued; it must be bounced immediately. + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + MergeBuilder(document::BucketId(16, 1)) + .nodes(2, 1, 0).chain(2, 1).unordered(true) + .memory_usage(8_Mi).create(), + MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY)); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged + + // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in. + auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply(); + fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine")); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + std::shared_ptr<api::StorageReply>(std::move(fwd_reply)), + MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge + + ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0); + + // New merge is accepted + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create()); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1 + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi); +} + +TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) { + _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + + // Accepting this merge would exceed memory limits. It is sent directly from a distributor and + // can therefore be enqueued. + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create()); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged +} + +TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) { + _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi); + + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create()); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi); +} + +TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) { + // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing + // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we + // send for below in the test code. + ASSERT_LT(throttler_max_merges_pending(0), 1000); + + _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi); + // Fill up active window on node 0. Note: these merges do not have any associated memory cost. + fill_throttler_queue_with_n_commands(0, 0); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); + + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create()); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); +} + +// TODO define and test auto-deduced max memory usage + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage |