diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-11-13 15:52:27 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-13 15:52:27 +0100 |
commit | aacfc3aa817c9bcd302aa63b3c23e265e9e84e40 (patch) | |
tree | 4391903441be0a97bc49b8248f215cb449eb1f1f | |
parent | 24c328285d8726f4cf1aefd323c1e5096a6f83e9 (diff) | |
parent | 777f614ba7aa54a33de96e277f7029ee391ba276 (diff) |
Merge pull request #29326 from vespa-engine/vekterli/also-memory-throttle-queued-merges
Also memory limit-throttle enqueued merges
3 files changed, 65 insertions, 29 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 6f80ffe0727..a480ba2740f 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1603,6 +1603,39 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) { EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); } +TEST_F(MergeThrottlerTest, enqueued_merge_not_started_if_insufficient_memory_available) { + // See `queued_merges_are_not_counted_towards_memory_usage` test for magic number rationale + const auto max_pending = throttler_max_merges_pending(0); + ASSERT_LT(max_pending, 1000); + ASSERT_GT(max_pending, 1); + throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi); + + // Fill up entire active window and enqueue a single merge + fill_throttler_queue_with_n_commands(0, 0); + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(11_Mi).create()); + waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue + + // Drain all active merges. As long as we have other active merges, the enqueued merge should not + // be allowed through since it's too large. Eventually it will hit the "at least one merge must + // be allowed at any time regardless of size" exception and is dequeued. + for (uint32_t i = 0; i < max_pending; ++i) { + auto fwd_cmd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply(); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + std::shared_ptr<api::StorageReply>(std::move(fwd_reply)), + MessageType::MERGEBUCKET_REPLY, ReturnCode::OK)); // Unwind reply for completed merge + + if (i < max_pending - 1) { + // Merge should still be in the queue, as it requires 11 MiB, and we only have 10 MiB. + // It will eventually be executed when the window is empty (see below). + waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); + } + } + // We've freed up the entire send window, so the over-sized merge can finally squeeze through. + waitUntilMergeQueueIs(throttler(0), 0, _messageWaitTime); +} + namespace { vespalib::HwInfo make_mem_info(uint64_t mem_size) { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 54a4ddbc780..b99c238f9ab 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -419,6 +419,13 @@ MergeThrottler::getNextQueuedMerge() return entry._msg; } +const api::MergeBucketCommand& +MergeThrottler::peek_merge_queue() const noexcept +{ + assert(!_queue.empty()); + return dynamic_cast<const api::MergeBucketCommand&>(*_queue.begin()->_msg); +} + void MergeThrottler::enqueue_merge_for_later_processing( const api::StorageMessage::SP& msg, @@ -549,45 +556,40 @@ MergeThrottler::rejectOutdatedQueuedMerges( // If there's a merge queued and the throttling policy allows for // the merge to be processed, do so. bool -MergeThrottler::attemptProcessNextQueuedMerge( - MessageGuard& msgGuard) +MergeThrottler::attemptProcessNextQueuedMerge(MessageGuard& msgGuard) { - if (!canProcessNewMerge()) { + if (_queue.empty()) { + return false; + } + if ( ! (canProcessNewMerge() && accepting_merge_is_within_memory_limits(peek_merge_queue()))) { // Should never reach a non-sending state when there are // no to-be-replied merges that can trigger a new processing assert(!_merges.empty()); return false; } + // If we get here, there must be something to dequeue. api::StorageMessage::SP msg = getNextQueuedMerge(); - if (msg) { - // In case of resends and whatnot, it's possible for a merge - // command to be in the queue while another higher priority - // command for the same bucket sneaks in front of it and gets - // a slot. Send BUSY in this case to make the distributor retry - // later, at which point the existing merge has hopefully gone - // through and the new one will be effectively a no-op to perform - if (!isMergeAlreadyKnown(msg)) { - LOG(spam, "Processing queued merge %s", msg->toString().c_str()); - processNewMergeCommand(msg, msgGuard); - } else { - vespalib::asciistream oss; - oss << "Queued merge " << msg->toString() << " is out of date; it has already " - "been started by someone else since it was queued"; - LOG(debug, "%s", oss.c_str()); - sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg), - api::ReturnCode(api::ReturnCode::BUSY, oss.str()), - msgGuard, _metrics->chaining); - } - return true; + assert(msg); + // In case of resends and whatnot, it's possible for a merge + // command to be in the queue while another higher priority + // command for the same bucket sneaks in front of it and gets + // a slot. Send BUSY in this case to make the distributor retry + // later, at which point the existing merge has hopefully gone + // through and the new one will be effectively a no-op to perform + if (!isMergeAlreadyKnown(msg)) { + LOG(spam, "Processing queued merge %s", msg->toString().c_str()); + processNewMergeCommand(msg, msgGuard); } else { - if (_queue.empty()) { - LOG(spam, "Queue empty - no merges to process"); - } else { - LOG(spam, "Merges queued, but throttle policy disallows further merges at this time"); - } + vespalib::asciistream oss; + oss << "Queued merge " << msg->toString() << " is out of date; it has already " + "been started by someone else since it was queued"; + LOG(debug, "%s", oss.c_str()); + sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg), + api::ReturnCode(api::ReturnCode::BUSY, oss.str()), + msgGuard, _metrics->chaining); } - return false; + return true; } bool diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index a5559c159bf..e210a8bfb8b 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -372,6 +372,7 @@ private: [[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept; [[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept; [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept; + [[nodiscard]] const api::MergeBucketCommand& peek_merge_queue() const noexcept; void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, |