aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2024-02-02 21:49:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2024-02-02 21:49:33 +0000
commit2bd4ba190d12ab591fb18f8225c9567305a1eb1f (patch)
tree3faacd744acbb6b6ba8a6e9cd89f6a51f612af3e /storage
parent37b1e8d50edaf785a3d87f353c9906627fbda34f (diff)
disable_queue_limits_for_chained_merges has long been true, GC
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp19
-rw-r--r--storage/src/vespa/storage/config/stor-server.def8
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h2
4 files changed, 7 insertions, 37 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index a480ba2740f..cdf203b8a39 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -179,7 +179,7 @@ struct MergeThrottlerTest : Test {
std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
+ void receive_chained_merge_with_full_queue(bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
auto& queue = _throttlers[throttler_idx]->getMergeQueue();
@@ -1234,10 +1234,9 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
}
void
-MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
- _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits);
size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1269,21 +1268,13 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
}
TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) {
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue());
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
}
-TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) {
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false));
-
- _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
- auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
- EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
-}
-
TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) {
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue());
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
@@ -1293,7 +1284,7 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa
TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) {
// Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too.
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true));
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
// Unordered merge is immediately forwarded to the next node
_topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 26b8450ab20..8cd204bcf9f 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -79,14 +79,6 @@ merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648
## "source only", as merges do not cause mutations on such nodes.
resource_exhaustion_merge_back_pressure_duration_secs double default=30.0
-## If true, received merges that have already been accepted into the pending
-## merge window on at least one node will not be restricted by the configured
-## max_merge_queue_size limit. They will be allowed to enqueue regardless of
-## the current queue size. This avoids wasting the time spent being accepted
-## into merge windows, which would happen if the merge were to be bounced with
-## a busy-reply that would subsequently be unwound through the entire merge chain.
-disable_queue_limits_for_chained_merges bool default=true
-
## Whether the deadlock detector should be enabled or not. If disabled, it will
## still run, but it will never actually abort the process it is running in.
enable_dead_lock_detector bool default=false
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index b99c238f9ab..c14bc6dc5eb 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -12,7 +12,6 @@
#include <vespa/messagebus/error.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/config/helper/configfetcher.hpp>
-#include <vespa/config/subscription/configuri.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/string_escape.h>
#include <vespa/vespalib/util/stringfmt.h>
@@ -206,7 +205,6 @@ MergeThrottler::MergeThrottler(
_active_merge_memory_used_bytes(0),
_max_merge_memory_usage_bytes(0), // 0 ==> unlimited
_use_dynamic_throttling(false),
- _disable_queue_limits_for_chained_merges(false),
_closing(false)
{
_throttlePolicy->setMinWindowSize(20);
@@ -252,7 +250,6 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_maxQueueSize = new_config.maxMergeQueueSize;
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
- _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) {
_max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes);
} else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) {
@@ -437,8 +434,7 @@ MergeThrottler::enqueue_merge_for_later_processing(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
- // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued
- const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor();
+ const bool is_forwarded_merge = !mergeCmd.from_distributor();
_queue.emplace(msg, _queueSequence++, is_forwarded_merge);
_metrics->queueSize.set(static_cast<int64_t>(_queue.size()));
}
@@ -767,8 +763,7 @@ bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) co
if (cmd.use_unordered_forwarding()) {
return cmd.from_distributor();
}
- return ((_queue.size() < _maxQueueSize)
- || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor()));
+ return (_queue.size() < _maxQueueSize) || !cmd.from_distributor();
}
// Must be run from worker thread
@@ -1332,12 +1327,6 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
void
-MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept {
- std::lock_guard lock(_stateLock);
- _disable_queue_limits_for_chained_merges = disable_limits;
-}
-
-void
MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept {
std::lock_guard lock(_stateLock);
_max_merge_memory_usage_bytes = max_memory_bytes;
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index e210a8bfb8b..1e791136476 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -185,7 +185,6 @@ private:
size_t _active_merge_memory_used_bytes;
size_t _max_merge_memory_usage_bytes;
bool _use_dynamic_throttling;
- bool _disable_queue_limits_for_chained_merges;
bool _closing;
public:
/**
@@ -227,7 +226,6 @@ public:
// For unit testing only
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
- void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept;
void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept;
[[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept;
void set_hw_info_locking(const vespalib::HwInfo& hw_info);