diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2024-02-05 11:45:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-05 11:45:57 +0100 |
commit | 7f6516e9da613cd7809cdd202e809354563efa0e (patch) | |
tree | 24c8e490c55521aec7c618ebb3e85953fb45acfd | |
parent | b2fc8e4051afa7c5398ba198f34d6141e6a07fed (diff) | |
parent | ba5c7e31a110804bff0276c6569a63f4acb6cb94 (diff) |
Merge pull request #30158 from vespa-engine/balder/disable_queue_limits_for_chained_merges-always-true
disable_queue_limits_for_chained_merges has long been true, GC
10 files changed, 8 insertions, 57 deletions
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java index 46590d18b40..574f5b04075 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java @@ -112,7 +112,6 @@ public class StorageClusterTest { StorServerConfig config = new StorServerConfig(builder); assertEquals(16, config.max_merges_per_node()); assertEquals(100, config.max_merge_queue_size()); - assertTrue(config.disable_queue_limits_for_chained_merges()); } @Test diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index 444c1157cc4..6a11bbcc3f1 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -505,7 +505,6 @@ App::get_options(int argc, char **argv) { "documents", 1, nullptr, 0 }, { "flip-nodes", 1, nullptr, 0 }, { "groups", 1, nullptr, 0 }, - { "ignore-merge-queue-limit", 0, nullptr, 0 }, { "indexing-sequencer", 1, nullptr, 0 }, { "max-merges-per-node", 1, nullptr, 0 }, { "max-merge-queue-size", 1, nullptr, 0 }, @@ -533,7 +532,6 @@ App::get_options(int argc, char **argv) LONGOPT_DOCUMENTS, LONGOPT_FLIP_NODES, LONGOPT_GROUPS, - LONGOPT_IGNORE_MERGE_QUEUE_LIMIT, LONGOPT_INDEXING_SEQUENCER, LONGOPT_MAX_MERGES_PER_NODE, LONGOPT_MAX_MERGE_QUEUE_SIZE, @@ -582,9 +580,6 @@ App::get_options(int argc, char **argv) case LONGOPT_GROUPS: _bm_params.set_groups(atoi(optarg)); break; - case LONGOPT_IGNORE_MERGE_QUEUE_LIMIT: - _bm_params.set_disable_queue_limits_for_chained_merges(true); - break; case LONGOPT_INDEXING_SEQUENCER: _bm_params.set_indexing_sequencer(optarg); break; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp index 12f90523760..5b5a25fa00d 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -7,7 +7,6 @@ namespace search::bmcluster { BmClusterParams::BmClusterParams() : _bucket_db_stripe_bits(4), - _disable_queue_limits_for_chained_merges(false), // Same default as in stor-server.def _distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def _distributor_stripes(0), _doc_store_chunk_compression_level(9), // Same default as in proton.def diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h index 585d6426b87..9c2329dbbf9 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -14,7 +14,6 @@ namespace search::bmcluster { class BmClusterParams { uint32_t _bucket_db_stripe_bits; - bool _disable_queue_limits_for_chained_merges; uint32_t _distributor_merge_busy_wait; uint32_t _distributor_stripes; uint32_t _doc_store_chunk_compression_level; @@ -43,7 +42,6 @@ public: BmClusterParams(); ~BmClusterParams(); uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } - bool get_disable_queue_limits_for_chained_merges() const noexcept { return _disable_queue_limits_for_chained_merges; } uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; } uint32_t get_distributor_stripes() const { return _distributor_stripes; } uint32_t get_doc_store_chunk_compression_level() const noexcept { return _doc_store_chunk_compression_level; } @@ -70,7 +68,6 @@ public: bool needs_message_bus() const { return _use_message_bus || _use_document_api; } bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } - void set_disable_queue_limits_for_chained_merges(bool value) { _disable_queue_limits_for_chained_merges = value; } void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; } void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; } void set_doc_store_chunk_compression_level(uint32_t value) { _doc_store_chunk_compression_level = value; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 9bc9a47c59d..d47f6d68e3b 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -31,36 +31,27 @@ #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/storage/common/i_storage_chain_builder.h> -#include <vespa/storage/config/config-stor-bouncer.h> -#include <vespa/storage/config/config-stor-communicationmanager.h> #include <vespa/storage/config/config-stor-distributormanager.h> #include <vespa/storage/config/config-stor-prioritymapping.h> -#include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/config/config-stor-status.h> #include <vespa/storage/config/config-stor-visitordispatcher.h> #include <vespa/storage/distributor/bucket_spaces_stats_provider.h> #include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/visiting/config-stor-visitor.h> #include <vespa/storageserver/app/distributorprocess.h> #include <vespa/storageserver/app/servicelayerprocess.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/config-attributes.h> #include <vespa/config-bucketspaces.h> #include <vespa/config-imported-fields.h> #include <vespa/config-indexschema.h> -#include <vespa/config-persistence.h> #include <vespa/config-rank-profiles.h> #include <vespa/config-slobroks.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config-stor-filestor.h> #include <vespa/config-summary.h> #include <vespa/config-upgrading.h> #include <vespa/config/common/configcontext.h> #include <vespa/document/bucket/bucketspace.h> -#include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/repo/configbuilder.h> #include <vespa/document/repo/document_type_repo_factory.h> #include <vespa/document/repo/documenttyperepo.h> @@ -299,7 +290,6 @@ struct StorageConfigSet messagebus() { stor_distribution = distribution.get_distribution_config(); - stor_server.disableQueueLimitsForChainedMerges = params.get_disable_queue_limits_for_chained_merges(); stor_server.nodeIndex = node_idx; stor_server.isDistributor = distributor; stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits(); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index a480ba2740f..cdf203b8a39 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -179,7 +179,7 @@ struct MergeThrottlerTest : Test { std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg); void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); + void receive_chained_merge_with_full_queue(bool unordered_fwd = false); std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { auto& queue = _throttlers[throttler_idx]->getMergeQueue(); @@ -1234,10 +1234,9 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist } void -MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd) +MergeThrottlerTest::receive_chained_merge_with_full_queue(bool unordered_fwd) { // Note: uses node with index 1 to not be the first node in chain - _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits); size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { @@ -1269,21 +1268,13 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim } TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) { - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue()); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); } -TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) { - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false)); - - _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); - auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); - EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); -} - TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) { - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue()); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); @@ -1293,7 +1284,7 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) { // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too. - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true)); + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); // Unordered merge is immediately forwarded to the next node _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 92ae38ea7c6..9e7fb600cae 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -18,6 +18,7 @@ mbus_distributor_node_max_pending_size int default=0 mbus_content_node_max_pending_size int default=0 # Minimum size of packets to compress (0 means no compression) +## TODO Common compression config for mbus and rpc, and consider ZSTD as default mbus.compress.limit int default=1024 restart ## Compression level for packets diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 26b8450ab20..8cd204bcf9f 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -79,14 +79,6 @@ merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648 ## "source only", as merges do not cause mutations on such nodes. resource_exhaustion_merge_back_pressure_duration_secs double default=30.0 -## If true, received merges that have already been accepted into the pending -## merge window on at least one node will not be restricted by the configured -## max_merge_queue_size limit. They will be allowed to enqueue regardless of -## the current queue size. This avoids wasting the time spent being accepted -## into merge windows, which would happen if the merge were to be bounced with -## a busy-reply that would subsequently be unwound through the entire merge chain. -disable_queue_limits_for_chained_merges bool default=true - ## Whether the deadlock detector should be enabled or not. If disabled, it will ## still run, but it will never actually abort the process it is running in. enable_dead_lock_detector bool default=false diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index b99c238f9ab..c14bc6dc5eb 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -12,7 +12,6 @@ #include <vespa/messagebus/error.h> #include <vespa/config/common/exceptions.h> #include <vespa/config/helper/configfetcher.hpp> -#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/string_escape.h> #include <vespa/vespalib/util/stringfmt.h> @@ -206,7 +205,6 @@ MergeThrottler::MergeThrottler( _active_merge_memory_used_bytes(0), _max_merge_memory_usage_bytes(0), // 0 ==> unlimited _use_dynamic_throttling(false), - _disable_queue_limits_for_chained_merges(false), _closing(false) { _throttlePolicy->setMinWindowSize(20); @@ -252,7 +250,6 @@ MergeThrottler::on_configure(const StorServerConfig& new_config) _maxQueueSize = new_config.maxMergeQueueSize; _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); - _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) { _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes); } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) { @@ -437,8 +434,7 @@ MergeThrottler::enqueue_merge_for_later_processing( if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; } - // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued - const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor(); + const bool is_forwarded_merge = !mergeCmd.from_distributor(); _queue.emplace(msg, _queueSequence++, is_forwarded_merge); _metrics->queueSize.set(static_cast<int64_t>(_queue.size())); } @@ -767,8 +763,7 @@ bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) co if (cmd.use_unordered_forwarding()) { return cmd.from_distributor(); } - return ((_queue.size() < _maxQueueSize) - || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor())); + return (_queue.size() < _maxQueueSize) || !cmd.from_distributor(); } // Must be run from worker thread @@ -1332,12 +1327,6 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } void -MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept { - std::lock_guard lock(_stateLock); - _disable_queue_limits_for_chained_merges = disable_limits; -} - -void MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept { std::lock_guard lock(_stateLock); _max_merge_memory_usage_bytes = max_memory_bytes; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index e210a8bfb8b..1e791136476 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -185,7 +185,6 @@ private: size_t _active_merge_memory_used_bytes; size_t _max_merge_memory_usage_bytes; bool _use_dynamic_throttling; - bool _disable_queue_limits_for_chained_merges; bool _closing; public: /** @@ -227,7 +226,6 @@ public: // For unit testing only const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } - void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept; void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept; [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept; void set_hw_info_locking(const vespalib::HwInfo& hw_info); |