aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp33
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp60
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
3 files changed, 65 insertions, 29 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 6f80ffe0727..a480ba2740f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1603,6 +1603,39 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
}
+TEST_F(MergeThrottlerTest, enqueued_merge_not_started_if_insufficient_memory_available) {
+ // See `queued_merges_are_not_counted_towards_memory_usage` test for magic number rationale
+ const auto max_pending = throttler_max_merges_pending(0);
+ ASSERT_LT(max_pending, 1000);
+ ASSERT_GT(max_pending, 1);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
+
+ // Fill up entire active window and enqueue a single merge
+ fill_throttler_queue_with_n_commands(0, 0);
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(11_Mi).create());
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ // Drain all active merges. As long as we have other active merges, the enqueued merge should not
+ // be allowed through since it's too large. Eventually it will hit the "at least one merge must
+ // be allowed at any time regardless of size" exception and is dequeued.
+ for (uint32_t i = 0; i < max_pending; ++i) {
+ auto fwd_cmd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::OK)); // Unwind reply for completed merge
+
+ if (i < max_pending - 1) {
+ // Merge should still be in the queue, as it requires 11 MiB, and we only have 10 MiB.
+ // It will eventually be executed when the window is empty (see below).
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime);
+ }
+ }
+ // We've freed up the entire send window, so the over-sized merge can finally squeeze through.
+ waitUntilMergeQueueIs(throttler(0), 0, _messageWaitTime);
+}
+
namespace {
vespalib::HwInfo make_mem_info(uint64_t mem_size) {
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 54a4ddbc780..b99c238f9ab 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -419,6 +419,13 @@ MergeThrottler::getNextQueuedMerge()
return entry._msg;
}
+const api::MergeBucketCommand&
+MergeThrottler::peek_merge_queue() const noexcept
+{
+ assert(!_queue.empty());
+ return dynamic_cast<const api::MergeBucketCommand&>(*_queue.begin()->_msg);
+}
+
void
MergeThrottler::enqueue_merge_for_later_processing(
const api::StorageMessage::SP& msg,
@@ -549,45 +556,40 @@ MergeThrottler::rejectOutdatedQueuedMerges(
// If there's a merge queued and the throttling policy allows for
// the merge to be processed, do so.
bool
-MergeThrottler::attemptProcessNextQueuedMerge(
- MessageGuard& msgGuard)
+MergeThrottler::attemptProcessNextQueuedMerge(MessageGuard& msgGuard)
{
- if (!canProcessNewMerge()) {
+ if (_queue.empty()) {
+ return false;
+ }
+ if ( ! (canProcessNewMerge() && accepting_merge_is_within_memory_limits(peek_merge_queue()))) {
// Should never reach a non-sending state when there are
// no to-be-replied merges that can trigger a new processing
assert(!_merges.empty());
return false;
}
+ // If we get here, there must be something to dequeue.
api::StorageMessage::SP msg = getNextQueuedMerge();
- if (msg) {
- // In case of resends and whatnot, it's possible for a merge
- // command to be in the queue while another higher priority
- // command for the same bucket sneaks in front of it and gets
- // a slot. Send BUSY in this case to make the distributor retry
- // later, at which point the existing merge has hopefully gone
- // through and the new one will be effectively a no-op to perform
- if (!isMergeAlreadyKnown(msg)) {
- LOG(spam, "Processing queued merge %s", msg->toString().c_str());
- processNewMergeCommand(msg, msgGuard);
- } else {
- vespalib::asciistream oss;
- oss << "Queued merge " << msg->toString() << " is out of date; it has already "
- "been started by someone else since it was queued";
- LOG(debug, "%s", oss.c_str());
- sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
- api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
- msgGuard, _metrics->chaining);
- }
- return true;
+ assert(msg);
+ // In case of resends and whatnot, it's possible for a merge
+ // command to be in the queue while another higher priority
+ // command for the same bucket sneaks in front of it and gets
+ // a slot. Send BUSY in this case to make the distributor retry
+ // later, at which point the existing merge has hopefully gone
+ // through and the new one will be effectively a no-op to perform
+ if (!isMergeAlreadyKnown(msg)) {
+ LOG(spam, "Processing queued merge %s", msg->toString().c_str());
+ processNewMergeCommand(msg, msgGuard);
} else {
- if (_queue.empty()) {
- LOG(spam, "Queue empty - no merges to process");
- } else {
- LOG(spam, "Merges queued, but throttle policy disallows further merges at this time");
- }
+ vespalib::asciistream oss;
+ oss << "Queued merge " << msg->toString() << " is out of date; it has already "
+ "been started by someone else since it was queued";
+ LOG(debug, "%s", oss.c_str());
+ sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
+ api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
+ msgGuard, _metrics->chaining);
}
- return false;
+ return true;
}
bool
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index a5559c159bf..e210a8bfb8b 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -372,6 +372,7 @@ private:
[[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] const api::MergeBucketCommand& peek_merge_queue() const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,