aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-10-05 12:32:45 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-10-05 12:32:45 +0000
commitae8b2f0f4684f29c4a9198fc3de227a0e19c2e99 (patch)
tree72c3389c7e9e07005c01663aa33dbed02d8bd0a7 /storage
parent5fa5119410891ef60feafc77a5688b530da550aa (diff)
Make ignoring queue limit for forwarded merges configurable
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp21
-rw-r--r--storage/src/vespa/storage/config/stor-server.def8
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp20
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h3
4 files changed, 46 insertions, 6 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7325840dbb0..fdf13a4cf14 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -146,6 +146,7 @@ struct MergeThrottlerTest : Test {
api::ReturnCode::Result expectedResultCode);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
+ void receive_chained_merge_with_full_queue(bool disable_queue_limits);
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -1263,8 +1264,11 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
EXPECT_EQ(1, _throttlers[0]->getMetrics().local.failures.busy.getValue());
}
-TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full) {
+void
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits)
+{
// Note: uses node with index 1 to not be the first node in chain
+ _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1280,7 +1284,7 @@ TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_fu
// Clear all forwarded merges
_topLinks[1]->getRepliesOnce();
// Send down another merge with non-empty chain. It should _not_ be busy bounced
- // as it has already been accepted into another node's merge window.
+ // (if limits disabled) as it has already been accepted into another node's merge window.
{
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
@@ -1288,9 +1292,22 @@ TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_fu
cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
_topLinks[1]->sendDown(cmd);
}
+}
+
+TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
}
+TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false));
+
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
std::vector<MergeBucketCommand::Node> nodes;
nodes.push_back(1);
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index ceb25e099b2..5f21e7bfdf5 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -45,6 +45,14 @@ max_merge_queue_size int default=1024
## "source only", as merges do not cause mutations on such nodes.
resource_exhaustion_merge_back_pressure_duration_secs double default=30.0
+## If true, received merges that have already been accepted into the pending
+## merge window on at least one node will not be restricted by the configured
+## max_merge_queue_size limit. They will be allowed to enqueue regardless of
+## the current queue size. This avoids wasting the time spent being accepted
+## into merge windows, which would happen if the merge were to be bounced with
+## a busy-reply that would subsequently be unwound throuh the entire merge chain.
+disable_queue_limits_for_chained_merges bool default=false
+
## Whether the deadlock detector should be enabled or not. If disabled, it will
## still run, but it will never actually abort the process it is running in.
enable_dead_lock_detector bool default=false restart
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index c2c806c33ac..06d49b2155b 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -205,6 +205,7 @@ MergeThrottler::MergeThrottler(
_rendezvous(RENDEZVOUS_NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
+ _disable_queue_limits_for_chained_merges(false),
_closing(false)
{
_throttlePolicy->setMaxPendingCount(20);
@@ -240,6 +241,7 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
_maxQueueSize = newConfig->maxMergeQueueSize;
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs));
+ _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges;
}
MergeThrottler::~MergeThrottler()
@@ -703,6 +705,13 @@ bool MergeThrottler::backpressure_mode_active() const {
return backpressure_mode_active_no_lock();
}
+bool MergeThrottler::allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept {
+ // We let any merge through that has already passed through at least one other node's merge
+ // window, as that has already taken up a logical resource slot on all those nodes. Busy-bouncing
+ // a merge at that point would undo a great amount of thumb-twiddling and waiting.
+ return (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty());
+}
+
// Must be run from worker thread
void
MergeThrottler::handleMessageDown(
@@ -732,10 +741,7 @@ MergeThrottler::handleMessageDown(
processCycledMergeCommand(msg, msgGuard);
} else if (canProcessNewMerge()) {
processNewMergeCommand(msg, msgGuard);
- } else if ((_queue.size() < _maxQueueSize) || !mergeCmd.getChain().empty()) {
- // We let any merge through that has already passed through at least one other node's merge
- // window, as that has already taken up a logical resource slot on all those nodes. Busy-bouncing
- // a merge at that point would undo a great amount of thumb-twiddling and waiting.
+ } else if ((_queue.size() < _maxQueueSize) || allow_merge_with_queue_full(mergeCmd)) {
enqueueMerge(msg, msgGuard); // Queue for later processing
} else {
// No more room at the inn. Return BUSY so that the
@@ -1261,6 +1267,12 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
void
+MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept {
+ std::lock_guard lock(_stateLock);
+ _disable_queue_limits_for_chained_merges = disable_limits;
+}
+
+void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
{
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 0c608f29196..adca4ca6a00 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -172,6 +172,7 @@ private:
RendezvousState _rendezvous;
mutable std::chrono::steady_clock::time_point _throttle_until_time;
std::chrono::steady_clock::duration _backpressure_duration;
+ bool _disable_queue_limits_for_chained_merges;
bool _closing;
public:
/**
@@ -209,6 +210,7 @@ public:
// For unit testing only
const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
+ void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
// For unit testing only
std::mutex & getMonitor() { return _messageLock; }
std::mutex & getStateLock() { return _stateLock; }
@@ -347,6 +349,7 @@ private:
bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
+ bool allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,