summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-24 08:45:51 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-24 14:21:17 +0000
commitb3f3f67862dc34b753e93fb614a9fa1e84b68b3f (patch)
tree0a04d8c0c9a03cef81e3b83c72bd9a5ef8cdca49 /storage/src/tests/storageserver
parent418c89cd87e29b21bc3e1156203d79dfef1fe424 (diff)
Support use of dynamic throttling in MergeThrottler component
Now always uses `DynamicThrottlePolicy` instead of `StaticThrottlePolicy`, but by default hardwires the min/max limits to _functionally_ act as if a static policy were in place. If dynamic throttling is enabled, forwarded (unordered) merges are now busy-bounced instead of immediately accepted into the pending window if the window size is exhausted. This so that dynamic throttling windows on nodes earlier in the forwarding chain can be sized down implicitly (thus the _dynamic_ aspect of it all). This is done to help control resource usage and content node queue sizes. Dynamic throttling may be enabled/disabled live via config. If enabled, the following (live) config is supported: * Minimum window size * Maximum window size * Window size increment More may be added (and defaults changed) as appropriate.
Diffstat (limited to 'storage/src/tests/storageserver')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp59
1 files changed, 31 insertions, 28 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 0f844ab6b4f..89b769078cc 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,18 +1,19 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/storageserver/mergethrottler.h>
-#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/vespalib/util/exceptions.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test {
assert(merge);
return merge;
}
+
+ [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept;
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -212,13 +215,10 @@ bool
checkChain(const StorageMessage::SP& msg,
Iterator first, Iterator end)
{
- const MergeBucketCommand& cmd =
- dynamic_cast<const MergeBucketCommand&>(*msg);
-
+ auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg);
if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) {
return false;
}
-
return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
}
@@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout)
}
+uint32_t
+MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept
+{
+ return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize());
+}
+
// Extremely simple test that just checks that (min|max)_merges_per_node
// under the stor-server config gets propagated to all the nodes
TEST_F(MergeThrottlerTest, merges_config) {
for (int i = 0; i < _storageNodeCount; ++i) {
- EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount());
+ EXPECT_EQ(25, throttler_max_merges_pending(i));
EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize());
}
}
@@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) {
TEST_F(MergeThrottlerTest, priority_queuing) {
// Fill up all active merges
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
ASSERT_GE(maxPending, 4u);
for (size_t i = 0; i < maxPending; ++i) {
@@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
// in the queue for a merge that is already known.
TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Fill up all active merges and 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) {
// order.
TEST_F(MergeThrottlerTest, forward_queued_merge) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
DummyStorageLink& bottomLink(*_bottomLinks[1]);
// Fill up all active merges and then 3 queued ones
- size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(1);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}});
@@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
TEST_F(MergeThrottlerTest, flush) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
// Second, test that we get rejected before queueing up. This is to
// avoid a hypothetical deadlock scenario.
// Fill up all active merges
- {
-
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
- for (size_t i = 0; i < maxPending; ++i) {
- auto fillCmd = std::make_shared<MergeBucketCommand>(
- makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
- _topLinks[0]->sendDown(fillCmd);
- }
+ size_t maxPending = throttler_max_merges_pending(0);
+ for (size_t i = 0; i < maxPending; ++i) {
+ auto fillCmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
+ _topLinks[0]->sendDown(fillCmd);
}
_topLinks[0]->sendDown(cmd);
@@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){
// Fill up all active merges and then 3 queued ones with the same
// system state
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// State is version 1. Send down several merges with state version 2.
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// TODO remove functionality and test
TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Fill up all active merges and then 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
}
TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) {
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
@@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
- size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}});
@@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
}
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
- size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(throttler_index);
for (size_t i = 0; i < max_pending + queued_count; ++i) {
_topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
.nodes(throttler_index, throttler_index + 1)