diff options
Diffstat (limited to 'storage')
4 files changed, 74 insertions, 2 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index dfeaee031ba..fdf13a4cf14 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -146,6 +146,7 @@ struct MergeThrottlerTest : Test { api::ReturnCode::Result expectedResultCode); void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); + void receive_chained_merge_with_full_queue(bool disable_queue_limits); }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -1217,7 +1218,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { static_cast<MergeBucketReply&>(*reply).getResult().getResult()); } -TEST_F(MergeThrottlerTest, busy_returned_on_full_queue) { +TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) { size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); @@ -1227,6 +1228,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue) { nodes.push_back(0); nodes.push_back(1); nodes.push_back(2); + // No chain set, i.e. merge command is freshly squeezed from a distributor. auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -1262,6 +1264,50 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue) { EXPECT_EQ(1, _throttlers[0]->getMetrics().local.failures.busy.getValue()); } +void +MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits) +{ + // Note: uses node with index 1 to not be the first node in chain + _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); + size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); + // No chain set, i.e. merge command is freshly squeezed from a distributor. + auto cmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1); + _topLinks[1]->sendDown(cmd); + } + _topLinks[1]->waitForMessages(max_pending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued, _messageWaitTime); + + // Clear all forwarded merges + _topLinks[1]->getRepliesOnce(); + // Send down another merge with non-empty chain. It should _not_ be busy bounced + // (if limits disabled) as it has already been accepted into another node's merge window. + { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); + auto cmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); + cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0 + _topLinks[1]->sendDown(cmd); + } +} + +TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); +} + +TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false)); + + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); +} + TEST_F(MergeThrottlerTest, broken_cycle) { std::vector<MergeBucketCommand::Node> nodes; nodes.push_back(1); diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index ceb25e099b2..d2ef9d229bf 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -45,6 +45,14 @@ max_merge_queue_size int default=1024 ## "source only", as merges do not cause mutations on such nodes. resource_exhaustion_merge_back_pressure_duration_secs double default=30.0 +## If true, received merges that have already been accepted into the pending +## merge window on at least one node will not be restricted by the configured +## max_merge_queue_size limit. They will be allowed to enqueue regardless of +## the current queue size. This avoids wasting the time spent being accepted +## into merge windows, which would happen if the merge were to be bounced with +## a busy-reply that would subsequently be unwound through the entire merge chain. +disable_queue_limits_for_chained_merges bool default=false + ## Whether the deadlock detector should be enabled or not. If disabled, it will ## still run, but it will never actually abort the process it is running in. enable_dead_lock_detector bool default=false restart diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 1761abfc097..06d49b2155b 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -205,6 +205,7 @@ MergeThrottler::MergeThrottler( _rendezvous(RENDEZVOUS_NONE), _throttle_until_time(), _backpressure_duration(std::chrono::seconds(30)), + _disable_queue_limits_for_chained_merges(false), _closing(false) { _throttlePolicy->setMaxPendingCount(20); @@ -240,6 +241,7 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ _maxQueueSize = newConfig->maxMergeQueueSize; _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs)); + _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges; } MergeThrottler::~MergeThrottler() @@ -703,6 +705,13 @@ bool MergeThrottler::backpressure_mode_active() const { return backpressure_mode_active_no_lock(); } +bool MergeThrottler::allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept { + // We let any merge through that has already passed through at least one other node's merge + // window, as that has already taken up a logical resource slot on all those nodes. Busy-bouncing + // a merge at that point would undo a great amount of thumb-twiddling and waiting. + return (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty()); +} + // Must be run from worker thread void MergeThrottler::handleMessageDown( @@ -732,7 +741,7 @@ MergeThrottler::handleMessageDown( processCycledMergeCommand(msg, msgGuard); } else if (canProcessNewMerge()) { processNewMergeCommand(msg, msgGuard); - } else if (_queue.size() < _maxQueueSize) { + } else if ((_queue.size() < _maxQueueSize) || allow_merge_with_queue_full(mergeCmd)) { enqueueMerge(msg, msgGuard); // Queue for later processing } else { // No more room at the inn. Return BUSY so that the @@ -1258,6 +1267,12 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } void +MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept { + std::lock_guard lock(_stateLock); + _disable_queue_limits_for_chained_merges = disable_limits; +} + +void MergeThrottler::print(std::ostream& out, bool /*verbose*/, const std::string& /*indent*/) const { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 0c608f29196..adca4ca6a00 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -172,6 +172,7 @@ private: RendezvousState _rendezvous; mutable std::chrono::steady_clock::time_point _throttle_until_time; std::chrono::steady_clock::duration _backpressure_duration; + bool _disable_queue_limits_for_chained_merges; bool _closing; public: /** @@ -209,6 +210,7 @@ public: // For unit testing only const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } + void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept; // For unit testing only std::mutex & getMonitor() { return _messageLock; } std::mutex & getStateLock() { return _stateLock; } @@ -347,6 +349,7 @@ private: bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; bool backpressure_mode_active_no_lock() const; void backpressure_bounce_all_queued_merges(MessageGuard& guard); + bool allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept; void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, |