diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-24 08:45:51 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-24 14:21:17 +0000 |
commit | b3f3f67862dc34b753e93fb614a9fa1e84b68b3f (patch) | |
tree | 0a04d8c0c9a03cef81e3b83c72bd9a5ef8cdca49 /storage/src/tests/storageserver | |
parent | 418c89cd87e29b21bc3e1156203d79dfef1fe424 (diff) |
Support use of dynamic throttling in MergeThrottler component
Now always uses `DynamicThrottlePolicy` instead of `StaticThrottlePolicy`,
but by default hardwires the min/max limits to _functionally_ act
as if a static policy were in place.
If dynamic throttling is enabled, forwarded (unordered) merges are
now busy-bounced instead of immediately accepted into the pending window
if the window size is exhausted. This so that dynamic throttling
windows on nodes earlier in the forwarding chain can be sized down
implicitly (thus the _dynamic_ aspect of it all). This is done to help
control resource usage and content node queue sizes.
Dynamic throttling may be enabled/disabled live via config. If enabled,
the following (live) config is supported:
* Minimum window size
* Maximum window size
* Window size increment
More may be added (and defaults changed) as appropriate.
Diffstat (limited to 'storage/src/tests/storageserver')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 59 |
1 files changed, 31 insertions, 28 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 0f844ab6b4f..89b769078cc 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,18 +1,19 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/vespalib/util/document_runnable.h> -#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/messagebus/dynamicthrottlepolicy.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/storageserver/mergethrottler.h> -#include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/persistence/messages.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/vespalib/util/exceptions.h> #include <unordered_set> #include <memory> #include <iterator> @@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test { assert(merge); return merge; } + + [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept; }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -212,13 +215,10 @@ bool checkChain(const StorageMessage::SP& msg, Iterator first, Iterator end) { - const MergeBucketCommand& cmd = - dynamic_cast<const MergeBucketCommand&>(*msg); - + auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg); if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) { return false; } - return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first); } @@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout) } +uint32_t +MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept +{ + return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize()); +} + // Extremely simple test that just checks that (min|max)_merges_per_node // under the stor-server config gets propagated to all the nodes TEST_F(MergeThrottlerTest, merges_config) { for (int i = 0; i < _storageNodeCount; ++i) { - EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount()); + EXPECT_EQ(25, throttler_max_merges_pending(i)); EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize()); } } @@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) { TEST_F(MergeThrottlerTest, priority_queuing) { // Fill up all active merges - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); ASSERT_GE(maxPending, 4u); for (size_t i = 0; i < maxPending; ++i) { @@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { // in the queue for a merge that is already known. TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Fill up all active merges and 1 queued one - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 1; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); @@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) { // order. TEST_F(MergeThrottlerTest, forward_queued_merge) { // Fill up all active merges and then 3 queued ones - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); @@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { DummyStorageLink& bottomLink(*_bottomLinks[1]); // Fill up all active merges and then 3 queued ones - size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(1); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}}); @@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { TEST_F(MergeThrottlerTest, flush) { // Fill up all active merges and then 3 queued ones - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); @@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { // Second, test that we get rejected before queueing up. This is to // avoid a hypothetical deadlock scenario. // Fill up all active merges - { - - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); - for (size_t i = 0; i < maxPending; ++i) { - auto fillCmd = std::make_shared<MergeBucketCommand>( - makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); - _topLinks[0]->sendDown(fillCmd); - } + size_t maxPending = throttler_max_merges_pending(0); + for (size_t i = 0; i < maxPending; ++i) { + auto fillCmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); + _topLinks[0]->sendDown(fillCmd); } _topLinks[0]->sendDown(cmd); @@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){ // Fill up all active merges and then 3 queued ones with the same // system state - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; for (size_t i = 0; i < maxPending + 3; ++i) { @@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { // State is version 1. Send down several merges with state version 2. - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; for (size_t i = 0; i < maxPending + 3; ++i) { @@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { // TODO remove functionality and test TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Fill up all active merges and then 1 queued one - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 1; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); @@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { } TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) { - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); @@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim { // Note: uses node with index 1 to not be the first node in chain _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); - size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); + size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); @@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure) } void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { - size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); + size_t max_pending = throttler_max_merges_pending(throttler_index); for (size_t i = 0; i < max_pending + queued_count; ++i) { _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)) .nodes(throttler_index, throttler_index + 1) |