summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-11-13 13:58:41 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-11-13 14:13:38 +0000
commit777f614ba7aa54a33de96e277f7029ee391ba276 (patch)
treebece79399d35e6e01342a109af82cc12f45c4715 /storage
parent26285c69cd996e05040b2d147e48d36b9a0ad648 (diff)
Also memory limit throttle enqueued merges
This plugs the hole where merges could enter the active window even if doing so would exceeded the total memory limit, as dequeueing is a separate code path from when a merge is initially evaluated for inclusion in the active window. There is a theoretical head-of-line blocking/queue starvation issue if the merge at the front of the queue has an unrealistically large footprint and the memory limit is unrealistically low. In practice this is not expected to be a problem, and it should never cause merging to stop (at least one merge is always guaranteed to be able to execute). As such, not adding any kind of heuristics to deal with this for now.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp33
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp60
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
3 files changed, 65 insertions, 29 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 6f80ffe0727..a480ba2740f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1603,6 +1603,39 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
}
+TEST_F(MergeThrottlerTest, enqueued_merge_not_started_if_insufficient_memory_available) {
+ // See `queued_merges_are_not_counted_towards_memory_usage` test for magic number rationale
+ const auto max_pending = throttler_max_merges_pending(0);
+ ASSERT_LT(max_pending, 1000);
+ ASSERT_GT(max_pending, 1);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
+
+ // Fill up entire active window and enqueue a single merge
+ fill_throttler_queue_with_n_commands(0, 0);
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(11_Mi).create());
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ // Drain all active merges. As long as we have other active merges, the enqueued merge should not
+ // be allowed through since it's too large. Eventually it will hit the "at least one merge must
+ // be allowed at any time regardless of size" exception and is dequeued.
+ for (uint32_t i = 0; i < max_pending; ++i) {
+ auto fwd_cmd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::OK)); // Unwind reply for completed merge
+
+ if (i < max_pending - 1) {
+ // Merge should still be in the queue, as it requires 11 MiB, and we only have 10 MiB.
+ // It will eventually be executed when the window is empty (see below).
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime);
+ }
+ }
+ // We've freed up the entire send window, so the over-sized merge can finally squeeze through.
+ waitUntilMergeQueueIs(throttler(0), 0, _messageWaitTime);
+}
+
namespace {
vespalib::HwInfo make_mem_info(uint64_t mem_size) {
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 54a4ddbc780..b99c238f9ab 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -419,6 +419,13 @@ MergeThrottler::getNextQueuedMerge()
return entry._msg;
}
+const api::MergeBucketCommand&
+MergeThrottler::peek_merge_queue() const noexcept
+{
+ assert(!_queue.empty());
+ return dynamic_cast<const api::MergeBucketCommand&>(*_queue.begin()->_msg);
+}
+
void
MergeThrottler::enqueue_merge_for_later_processing(
const api::StorageMessage::SP& msg,
@@ -549,45 +556,40 @@ MergeThrottler::rejectOutdatedQueuedMerges(
// If there's a merge queued and the throttling policy allows for
// the merge to be processed, do so.
bool
-MergeThrottler::attemptProcessNextQueuedMerge(
- MessageGuard& msgGuard)
+MergeThrottler::attemptProcessNextQueuedMerge(MessageGuard& msgGuard)
{
- if (!canProcessNewMerge()) {
+ if (_queue.empty()) {
+ return false;
+ }
+ if ( ! (canProcessNewMerge() && accepting_merge_is_within_memory_limits(peek_merge_queue()))) {
// Should never reach a non-sending state when there are
// no to-be-replied merges that can trigger a new processing
assert(!_merges.empty());
return false;
}
+ // If we get here, there must be something to dequeue.
api::StorageMessage::SP msg = getNextQueuedMerge();
- if (msg) {
- // In case of resends and whatnot, it's possible for a merge
- // command to be in the queue while another higher priority
- // command for the same bucket sneaks in front of it and gets
- // a slot. Send BUSY in this case to make the distributor retry
- // later, at which point the existing merge has hopefully gone
- // through and the new one will be effectively a no-op to perform
- if (!isMergeAlreadyKnown(msg)) {
- LOG(spam, "Processing queued merge %s", msg->toString().c_str());
- processNewMergeCommand(msg, msgGuard);
- } else {
- vespalib::asciistream oss;
- oss << "Queued merge " << msg->toString() << " is out of date; it has already "
- "been started by someone else since it was queued";
- LOG(debug, "%s", oss.c_str());
- sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
- api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
- msgGuard, _metrics->chaining);
- }
- return true;
+ assert(msg);
+ // In case of resends and whatnot, it's possible for a merge
+ // command to be in the queue while another higher priority
+ // command for the same bucket sneaks in front of it and gets
+ // a slot. Send BUSY in this case to make the distributor retry
+ // later, at which point the existing merge has hopefully gone
+ // through and the new one will be effectively a no-op to perform
+ if (!isMergeAlreadyKnown(msg)) {
+ LOG(spam, "Processing queued merge %s", msg->toString().c_str());
+ processNewMergeCommand(msg, msgGuard);
} else {
- if (_queue.empty()) {
- LOG(spam, "Queue empty - no merges to process");
- } else {
- LOG(spam, "Merges queued, but throttle policy disallows further merges at this time");
- }
+ vespalib::asciistream oss;
+ oss << "Queued merge " << msg->toString() << " is out of date; it has already "
+ "been started by someone else since it was queued";
+ LOG(debug, "%s", oss.c_str());
+ sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
+ api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
+ msgGuard, _metrics->chaining);
}
- return false;
+ return true;
}
bool
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index a5559c159bf..e210a8bfb8b 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -372,6 +372,7 @@ private:
[[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] const api::MergeBucketCommand& peek_merge_queue() const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,