summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp160
1 files changed, 138 insertions, 22 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7a7f2551c2d..48e0ab186f2 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,17 +1,18 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/persistence/messages.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -34,18 +35,22 @@ using StorServerConfig = vespa::config::content::core::StorServerConfig;
vespalib::string _storage("storage");
struct MergeBuilder {
- document::BucketId _bucket;
- api::Timestamp _maxTimestamp;
- std::vector<uint16_t> _nodes;
- std::vector<uint16_t> _chain;
+ document::BucketId _bucket;
+ api::Timestamp _maxTimestamp;
+ std::vector<uint16_t> _nodes;
+ std::vector<uint16_t> _chain;
std::unordered_set<uint16_t> _source_only;
- uint64_t _clusterStateVersion;
+ uint64_t _clusterStateVersion;
+ uint32_t _memory_usage;
+ bool _unordered;
- MergeBuilder(const document::BucketId& bucket)
+ explicit MergeBuilder(const document::BucketId& bucket)
: _bucket(bucket),
_maxTimestamp(1234),
_chain(),
- _clusterStateVersion(1)
+ _clusterStateVersion(1),
+ _memory_usage(0),
+ _unordered(false)
{
nodes(0, 1, 2);
}
@@ -100,6 +105,14 @@ struct MergeBuilder {
_source_only.insert(node);
return *this;
}
+ MergeBuilder& memory_usage(uint32_t usage_bytes) {
+ _memory_usage = usage_bytes;
+ return *this;
+ }
+ MergeBuilder& unordered(bool is_unordered) {
+ _unordered = is_unordered;
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
@@ -112,6 +125,8 @@ struct MergeBuilder {
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->set_estimated_memory_footprint(_memory_usage);
+ cmd->set_use_unordered_forwarding(_unordered);
return cmd;
}
};
@@ -137,20 +152,21 @@ struct MergeThrottlerTest : Test {
std::vector<DummyStorageLink*> _bottomLinks;
MergeThrottlerTest();
- ~MergeThrottlerTest();
+ ~MergeThrottlerTest() override;
void SetUp() override;
void TearDown() override;
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
- void sendAndExpectReply(
+ void send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+ std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
+
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
@@ -180,10 +196,10 @@ MergeThrottlerTest::SetUp()
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
- DummyStorageLink* bottom = new DummyStorageLink;
+ auto* bottom = new DummyStorageLink;
throttler->push_back(std::unique_ptr<StorageLink>(bottom));
_servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release()));
@@ -291,6 +307,7 @@ TEST_F(MergeThrottlerTest, chain) {
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
+ cmd->set_estimated_memory_footprint(456'789);
StorageMessage::SP fwd = cmd;
StorageMessage::SP fwdToExec;
@@ -322,10 +339,12 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end()));
}
- // Ensure priority, cluster state version and timeout is correctly forwarded
+ // Ensure operation properties are forwarded as expected
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
- EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd);
+ EXPECT_EQ(as_merge.getClusterStateVersion(), 123);
+ EXPECT_EQ(as_merge.getTimeout(), 54321ms);
+ EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789);
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -1350,7 +1369,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
}
void
-MergeThrottlerTest::sendAndExpectReply(
+MergeThrottlerTest::send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode)
@@ -1362,13 +1381,22 @@ MergeThrottlerTest::sendAndExpectReply(
ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult());
}
+std::shared_ptr<api::StorageMessage>
+MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _topLinks[0]->sendDown(msg);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+}
+
TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) {
document::BucketId bucket(16, 1234);
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), nodes, api::Timestamp(1234));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1379,7 +1407,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ applyDiffCmd,
api::MessageType::APPLYBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1411,7 +1440,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges)
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
}
@@ -1428,7 +1458,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat
EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(),
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY));
@@ -1480,6 +1511,91 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
+ ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ std::shared_ptr<api::StorageMessage> fwd_cmd;
+ ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
+ // merge and can therefore NOT be enqueued; it must be bounced immediately.
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(document::BucketId(16, 1))
+ .nodes(2, 1, 0).chain(2, 1).unordered(true)
+ .memory_usage(8_Mi).create(),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+
+ // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+ fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine"));
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ // New merge is accepted
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+}
+
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent directly from a distributor and
+ // can therefore be enqueued.
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+}
+
+TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+}
+
+TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
+ // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing
+ // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we
+ // send for below in the test code.
+ ASSERT_LT(throttler_max_merges_pending(0), 1000);
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi);
+ // Fill up active window on node 0. Note: these merges do not have any associated memory cost.
+ fill_throttler_queue_with_n_commands(0, 0);
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+// TODO define and test auto-deduced max memory usage
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage