aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-10-30 14:20:20 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-11-01 15:36:56 +0000
commit6567a48a55f05f2f20c0cc7ae63dc454319c87e1 (patch)
tree81f292dd2c7f3fe9f6d5604bf5de7cca1184f057 /storage/src/tests/storageserver/mergethrottlertest.cpp
parentb04b6184f8d8fe0b9d569caf6ed46c69312c1821 (diff)
Add content node soft limit on max memory used by merges
If configured, the active merge window is limited so that the sum of estimated memory usage for its merges does not go beyond the configured soft memory limit. The window can always fit a minimum of 1 merge regardless of its size to ensure progress in the cluster (thus this is a soft limit, not a hard limit).
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp160
1 files changed, 138 insertions, 22 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7a7f2551c2d..48e0ab186f2 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,17 +1,18 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/persistence/messages.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -34,18 +35,22 @@ using StorServerConfig = vespa::config::content::core::StorServerConfig;
vespalib::string _storage("storage");
struct MergeBuilder {
- document::BucketId _bucket;
- api::Timestamp _maxTimestamp;
- std::vector<uint16_t> _nodes;
- std::vector<uint16_t> _chain;
+ document::BucketId _bucket;
+ api::Timestamp _maxTimestamp;
+ std::vector<uint16_t> _nodes;
+ std::vector<uint16_t> _chain;
std::unordered_set<uint16_t> _source_only;
- uint64_t _clusterStateVersion;
+ uint64_t _clusterStateVersion;
+ uint32_t _memory_usage;
+ bool _unordered;
- MergeBuilder(const document::BucketId& bucket)
+ explicit MergeBuilder(const document::BucketId& bucket)
: _bucket(bucket),
_maxTimestamp(1234),
_chain(),
- _clusterStateVersion(1)
+ _clusterStateVersion(1),
+ _memory_usage(0),
+ _unordered(false)
{
nodes(0, 1, 2);
}
@@ -100,6 +105,14 @@ struct MergeBuilder {
_source_only.insert(node);
return *this;
}
+ MergeBuilder& memory_usage(uint32_t usage_bytes) {
+ _memory_usage = usage_bytes;
+ return *this;
+ }
+ MergeBuilder& unordered(bool is_unordered) {
+ _unordered = is_unordered;
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
@@ -112,6 +125,8 @@ struct MergeBuilder {
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->set_estimated_memory_footprint(_memory_usage);
+ cmd->set_use_unordered_forwarding(_unordered);
return cmd;
}
};
@@ -137,20 +152,21 @@ struct MergeThrottlerTest : Test {
std::vector<DummyStorageLink*> _bottomLinks;
MergeThrottlerTest();
- ~MergeThrottlerTest();
+ ~MergeThrottlerTest() override;
void SetUp() override;
void TearDown() override;
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
- void sendAndExpectReply(
+ void send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+ std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
+
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
@@ -180,10 +196,10 @@ MergeThrottlerTest::SetUp()
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
- DummyStorageLink* bottom = new DummyStorageLink;
+ auto* bottom = new DummyStorageLink;
throttler->push_back(std::unique_ptr<StorageLink>(bottom));
_servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release()));
@@ -291,6 +307,7 @@ TEST_F(MergeThrottlerTest, chain) {
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
+ cmd->set_estimated_memory_footprint(456'789);
StorageMessage::SP fwd = cmd;
StorageMessage::SP fwdToExec;
@@ -322,10 +339,12 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end()));
}
- // Ensure priority, cluster state version and timeout is correctly forwarded
+ // Ensure operation properties are forwarded as expected
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
- EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd);
+ EXPECT_EQ(as_merge.getClusterStateVersion(), 123);
+ EXPECT_EQ(as_merge.getTimeout(), 54321ms);
+ EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789);
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -1350,7 +1369,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
}
void
-MergeThrottlerTest::sendAndExpectReply(
+MergeThrottlerTest::send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode)
@@ -1362,13 +1381,22 @@ MergeThrottlerTest::sendAndExpectReply(
ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult());
}
+std::shared_ptr<api::StorageMessage>
+MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _topLinks[0]->sendDown(msg);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+}
+
TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) {
document::BucketId bucket(16, 1234);
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), nodes, api::Timestamp(1234));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1379,7 +1407,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ applyDiffCmd,
api::MessageType::APPLYBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1411,7 +1440,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges)
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
}
@@ -1428,7 +1458,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat
EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(),
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY));
@@ -1480,6 +1511,91 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
+ ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ std::shared_ptr<api::StorageMessage> fwd_cmd;
+ ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
+ // merge and can therefore NOT be enqueued; it must be bounced immediately.
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(document::BucketId(16, 1))
+ .nodes(2, 1, 0).chain(2, 1).unordered(true)
+ .memory_usage(8_Mi).create(),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+
+ // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+ fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine"));
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ // New merge is accepted
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+}
+
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent directly from a distributor and
+ // can therefore be enqueued.
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+}
+
+TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+}
+
+TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
+ // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing
+ // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we
+ // send for below in the test code.
+ ASSERT_LT(throttler_max_merges_pending(0), 1000);
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi);
+ // Fill up active window on node 0. Note: these merges do not have any associated memory cost.
+ fill_throttler_queue_with_n_commands(0, 0);
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+// TODO define and test auto-deduced max memory usage
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage