diff options
Diffstat (limited to 'storage')
4 files changed, 182 insertions, 135 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 0f844ab6b4f..89b769078cc 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,18 +1,19 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/vespalib/util/document_runnable.h> -#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/messagebus/dynamicthrottlepolicy.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/storageserver/mergethrottler.h> -#include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/persistence/messages.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/vespalib/util/exceptions.h> #include <unordered_set> #include <memory> #include <iterator> @@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test { assert(merge); return merge; } + + [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept; }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -212,13 +215,10 @@ bool checkChain(const StorageMessage::SP& msg, Iterator first, Iterator end) { - const MergeBucketCommand& cmd = - dynamic_cast<const MergeBucketCommand&>(*msg); - + auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg); if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) { return false; } - return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first); } @@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout) } +uint32_t +MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept +{ + return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize()); +} + // Extremely simple test that just checks that (min|max)_merges_per_node // under the stor-server config gets propagated to all the nodes TEST_F(MergeThrottlerTest, merges_config) { for (int i = 0; i < _storageNodeCount; ++i) { - EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount()); + EXPECT_EQ(25, throttler_max_merges_pending(i)); EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize()); } } @@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) { TEST_F(MergeThrottlerTest, priority_queuing) { // Fill up all active merges - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); ASSERT_GE(maxPending, 4u); for (size_t i = 0; i < maxPending; ++i) { @@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { // in the queue for a merge that is already known. TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Fill up all active merges and 1 queued one - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 1; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); @@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) { // order. TEST_F(MergeThrottlerTest, forward_queued_merge) { // Fill up all active merges and then 3 queued ones - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); @@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { DummyStorageLink& bottomLink(*_bottomLinks[1]); // Fill up all active merges and then 3 queued ones - size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(1); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}}); @@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { TEST_F(MergeThrottlerTest, flush) { // Fill up all active merges and then 3 queued ones - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 3; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); @@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { // Second, test that we get rejected before queueing up. This is to // avoid a hypothetical deadlock scenario. // Fill up all active merges - { - - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); - for (size_t i = 0; i < maxPending; ++i) { - auto fillCmd = std::make_shared<MergeBucketCommand>( - makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); - _topLinks[0]->sendDown(fillCmd); - } + size_t maxPending = throttler_max_merges_pending(0); + for (size_t i = 0; i < maxPending; ++i) { + auto fillCmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); + _topLinks[0]->sendDown(fillCmd); } _topLinks[0]->sendDown(cmd); @@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){ // Fill up all active merges and then 3 queued ones with the same // system state - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; for (size_t i = 0; i < maxPending + 3; ++i) { @@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { // State is version 1. Send down several merges with state version 2. - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; for (size_t i = 0; i < maxPending + 3; ++i) { @@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { // TODO remove functionality and test TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Fill up all active merges and then 1 queued one - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); ASSERT_LT(maxPending, 100); for (size_t i = 0; i < maxPending + 1; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); @@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { } TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) { - size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = throttler_max_merges_pending(0); size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); @@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim { // Note: uses node with index 1 to not be the first node in chain _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); - size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); + size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); @@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure) } void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { - size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); + size_t max_pending = throttler_max_merges_pending(throttler_index); for (size_t i = 0; i < max_pending + queued_count; ++i) { _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)) .nodes(throttler_index, throttler_index + 1) diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index a368c2e5b6f..00be2929229 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -33,9 +33,19 @@ node_reliability int default=1 restart ## A merge operation will be chained through all nodes involved in the ## merge, only actually starting the operation when every node has ## allowed it to pass through. +## NOTE: these config values are _not_ used if merge_throttling_policy.type +## is configured to DYNAMIC (see below). max_merges_per_node int default=16 max_merge_queue_size int default=100 +## Chooses the throttling policy used to control the active merge window size +## of the MergeThrottler component. +merge_throttling_policy.type enum { STATIC, DYNAMIC } default=STATIC +## Only used if merge_throttling_policy.type == DYNAMIC: +merge_throttling_policy.min_window_size int default=16 +merge_throttling_policy.max_window_size int default=128 +merge_throttling_policy.window_size_increment double default=2.0 + ## If the persistence provider indicates that it has exhausted one or more ## of its internal resources during a mutating operation, new merges will ## be bounced for this duration. Not allowing further merges helps take diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 2a30acb1a74..4b4ff6c7e37 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -5,7 +5,7 @@ #include <vespa/storage/common/dummy_mbus_messages.h> #include <vespa/storage/persistence/messages.h> #include <vespa/vdslib/state/clusterstate.h> -#include <vespa/messagebus/message.h> +#include <vespa/messagebus/dynamicthrottlepolicy.h> #include <vespa/messagebus/error.h> #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -16,6 +16,8 @@ #include <vespa/log/log.h> LOG_SETUP(".mergethrottler"); +using vespa::config::content::core::StorServerConfig; + namespace storage { namespace { @@ -126,7 +128,7 @@ MergeThrottler::MergeNodeSequence::MergeNodeSequence(const api::MergeBucketComma } uint16_t -MergeThrottler::MergeNodeSequence::getNextNodeInChain() const +MergeThrottler::MergeNodeSequence::getNextNodeInChain() const noexcept { assert(_cmd.getChain().size() < _sortedNodes.size()); if (_use_unordered_forwarding) { @@ -144,7 +146,7 @@ MergeThrottler::MergeNodeSequence::getNextNodeInChain() const } bool -MergeThrottler::MergeNodeSequence::isChainCompleted() const +MergeThrottler::MergeNodeSequence::isChainCompleted() const noexcept { if (_cmd.getChain().size() != _sortedNodes.size()) return false; @@ -179,7 +181,7 @@ MergeThrottler::MergeThrottler( _merges(), _queue(), _maxQueueSize(1024), - _throttlePolicy(std::make_unique<mbus::StaticThrottlePolicy>()), + _throttlePolicy(std::make_unique<mbus::DynamicThrottlePolicy>()), _queueSequence(0), _messageLock(), _stateLock(), @@ -187,14 +189,16 @@ MergeThrottler::MergeThrottler( _metrics(std::make_unique<Metrics>()), _component(compReg, "mergethrottler"), _thread(), - _rendezvous(RENDEZVOUS_NONE), + _rendezvous(RendezvousState::NONE), _throttle_until_time(), _backpressure_duration(std::chrono::seconds(30)), + _use_dynamic_throttling(false), _disable_queue_limits_for_chained_merges(false), _closing(false) { - _throttlePolicy->setMaxPendingCount(20); - _configFetcher.subscribe<vespa::config::content::core::StorServerConfig>(configUri.getConfigId(), this); + _throttlePolicy->setMinWindowSize(20); + _throttlePolicy->setMaxWindowSize(20); + _configFetcher.subscribe<StorServerConfig>(configUri.getConfigId(), this); _configFetcher.start(); _component.registerStatusPage(*this); _component.registerMetric(*_metrics); @@ -204,7 +208,8 @@ void MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) { std::lock_guard lock(_stateLock); - + _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type + == StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC); if (newConfig->maxMergesPerNode < 1) { throw config::InvalidConfigException("Cannot have a max merge count of less than 1"); } @@ -214,12 +219,22 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) { throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0"); } - if (static_cast<double>(newConfig->maxMergesPerNode) - != _throttlePolicy->getMaxPendingCount()) - { - LOG(debug, "Setting new max pending count from max_merges_per_node: %d", - newConfig->maxMergesPerNode); - _throttlePolicy->setMaxPendingCount(newConfig->maxMergesPerNode); + if (_use_dynamic_throttling) { + auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1); + auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1); + if (min_win_sz > max_win_sz) { + min_win_sz = max_win_sz; + } + auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement); + _throttlePolicy->setMinWindowSize(min_win_sz); + _throttlePolicy->setMaxWindowSize(max_win_sz); + _throttlePolicy->setWindowSizeIncrement(win_sz_increment); + LOG(debug, "Using dynamic throttling window min/max [%d, %d], win size increment %.2g", + min_win_sz, max_win_sz, win_sz_increment); + } else { + // Use legacy config values when static throttling is enabled. + _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode); + _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode); } LOG(debug, "Setting new max queue size to %d", newConfig->maxMergeQueueSize); @@ -573,16 +588,16 @@ MergeThrottler::processQueuedMerges(MessageGuard& msgGuard) void MergeThrottler::handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { - if (_rendezvous != RENDEZVOUS_NONE) { + if (_rendezvous != RendezvousState::NONE) { LOG(spam, "rendezvous requested by external thread; establishing"); - assert(_rendezvous == RENDEZVOUS_REQUESTED); - _rendezvous = RENDEZVOUS_ESTABLISHED; + assert(_rendezvous == RendezvousState::REQUESTED); + _rendezvous = RendezvousState::ESTABLISHED; cond.notify_all(); - while (_rendezvous != RENDEZVOUS_RELEASED) { + while (_rendezvous != RendezvousState::RELEASED) { cond.wait(guard); } LOG(spam, "external thread rendezvous released"); - _rendezvous = RENDEZVOUS_NONE; + _rendezvous = RendezvousState::NONE; cond.notify_all(); } } @@ -604,7 +619,7 @@ MergeThrottler::run(framework::ThreadHandle& thread) while (_messagesDown.empty() && _messagesUp.empty() && !thread.interrupted() - && _rendezvous == RENDEZVOUS_NONE) + && _rendezvous == RendezvousState::NONE) { _messageCond.wait_for(msgLock, 1000ms); thread.registerTick(framework::WAIT_CYCLE); @@ -683,10 +698,20 @@ bool MergeThrottler::backpressure_mode_active() const { return backpressure_mode_active_no_lock(); } -bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept { +bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept { // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock. // See comment in may_allow_into_queue() for rationale. - return (cmd.use_unordered_forwarding() && !cmd.from_distributor()); + if (!cmd.use_unordered_forwarding() || cmd.from_distributor()) { + return false; + } + // We'll only get here if we're dealing with an unordered merge that has been forwarded + // from another content node. In other words, it's a merge we want to handle immediately + // instead of deferring in the queue for later processing. We already know that the merge + // window is full, so we must either allow it in regardless or bounce it back. The latter + // makes the most sense when dynamic throttling is enabled, as NACKed replies count + // _against_ incrementing the throttling window, thereby implicitly helping to reduce the + // merge pressure generated by other nodes. + return !_use_dynamic_throttling; } bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept { @@ -1155,10 +1180,10 @@ void MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { LOG(spam, "establishing rendezvous with worker thread"); - assert(_rendezvous == RENDEZVOUS_NONE); - _rendezvous = RENDEZVOUS_REQUESTED; + assert(_rendezvous == RendezvousState::NONE); + _rendezvous = RendezvousState::REQUESTED; cond.notify_all(); - while (_rendezvous != RENDEZVOUS_ESTABLISHED) { + while (_rendezvous != RendezvousState::ESTABLISHED) { cond.wait(guard); } LOG(spam, "rendezvous established with worker thread"); @@ -1167,9 +1192,9 @@ MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, void MergeThrottler::releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { - _rendezvous = RENDEZVOUS_RELEASED; + _rendezvous = RendezvousState::RELEASED; cond.notify_all(); - while (_rendezvous != RENDEZVOUS_NONE) { + while (_rendezvous != RendezvousState::NONE) { cond.wait(guard); } } @@ -1288,57 +1313,62 @@ MergeThrottler::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath&) const { std::lock_guard lock(_stateLock); - { - out << "<p>Max pending: " + if (_use_dynamic_throttling) { + out << "<p>Dynamic throttle policy; window size min/max: [" + << _throttlePolicy->getMinWindowSize() << ", " + << _throttlePolicy->getMaxWindowSize() + << "], current window size: " + << _throttlePolicy->getMaxPendingCount() + << "</p>\n"; + } else { + out << "<p>Static throttle policy; max pending: " << _throttlePolicy->getMaxPendingCount() << "</p>\n"; - out << "<p>Please see node metrics for performance numbers</p>\n"; - out << "<h3>Active merges (" - << _merges.size() - << ")</h3>\n"; - if (!_merges.empty()) { - out << "<ul>\n"; - for (auto& m : _merges) { - out << "<li>" << m.second.getMergeCmdString(); - if (m.second.isExecutingLocally()) { - out << " <strong>("; - if (m.second.isInCycle()) { - out << "cycled - "; - } else if (m.second.isCycleBroken()) { - out << "broken cycle (another node in the chain likely went down) - "; - } - out << "executing on this node)</strong>"; - } else if (m.second.isUnwinding()) { - out << " <strong>(was executed here, now unwinding)</strong>"; - } - if (m.second.isAborted()) { - out << " <strong>aborted</strong>"; + } + out << "<p>Please see node metrics for performance numbers</p>\n"; + out << "<h3>Active merges (" + << _merges.size() + << ")</h3>\n"; + if (!_merges.empty()) { + out << "<ul>\n"; + for (auto& m : _merges) { + out << "<li>" << m.second.getMergeCmdString(); + if (m.second.isExecutingLocally()) { + out << " <strong>("; + if (m.second.isInCycle()) { + out << "cycled - "; + } else if (m.second.isCycleBroken()) { + out << "broken cycle (another node in the chain likely went down) - "; } - out << "</li>\n"; + out << "executing on this node)</strong>"; + } else if (m.second.isUnwinding()) { + out << " <strong>(was executed here, now unwinding)</strong>"; } - out << "</ul>\n"; - } else { - out << "<p>None</p>\n"; + if (m.second.isAborted()) { + out << " <strong>aborted</strong>"; + } + out << "</li>\n"; } + out << "</ul>\n"; + } else { + out << "<p>None</p>\n"; } - { - out << "<h3>Queued merges (in priority order) (" - << _queue.size() - << ")</h3>\n"; - if (!_queue.empty()) { - out << "<ol>\n"; - for (auto& qm : _queue) { - // The queue always owns its messages, thus this is safe - out << "<li>Pri " - << static_cast<unsigned int>(qm._msg->getPriority()) - << ": " << *qm._msg; - out << "</li>\n"; - } - out << "</ol>\n"; - } else { - out << "<p>None</p>\n"; + out << "<h3>Queued merges (in priority order) (" + << _queue.size() + << ")</h3>\n"; + if (!_queue.empty()) { + out << "<ol>\n"; + for (auto& qm : _queue) { + // The queue always owns its messages, thus this is safe + out << "<li>Pri " + << static_cast<unsigned int>(qm._msg->getPriority()) + << ": " << *qm._msg; + out << "</li>\n"; } + out << "</ol>\n"; + } else { + out << "<p>None</p>\n"; } } diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index c115d36ad89..b953595c91e 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -15,7 +15,6 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/document/bucket/bucket.h> #include <vespa/vespalib/util/document_runnable.h> -#include <vespa/messagebus/staticthrottlepolicy.h> #include <vespa/metrics/metricset.h> #include <vespa/metrics/summetric.h> #include <vespa/metrics/countmetric.h> @@ -24,6 +23,10 @@ #include <vespa/config/config.h> #include <chrono> +namespace mbus { +class DynamicThrottlePolicy; +} + namespace storage { class AbortBucketOperationsCommand; @@ -47,7 +50,7 @@ public: metrics::LongCountMetric rejected; metrics::LongCountMetric other; - MergeFailureMetrics(metrics::MetricSet* owner); + explicit MergeFailureMetrics(metrics::MetricSet* owner); ~MergeFailureMetrics() override; }; @@ -69,8 +72,8 @@ public: MergeOperationMetrics chaining; MergeOperationMetrics local; - Metrics(metrics::MetricSet* owner = 0); - ~Metrics(); + explicit Metrics(metrics::MetricSet* owner = nullptr); + ~Metrics() override; }; private: @@ -114,14 +117,14 @@ private: bool _aborted; ChainedMergeState(); - ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false); + explicit ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false); ~ChainedMergeState(); // Use default copy-constructor/assignment operator - bool isExecutingLocally() const { return _executingLocally; } - void setExecutingLocally(bool execLocally) { _executingLocally = execLocally; } + bool isExecutingLocally() const noexcept { return _executingLocally; } + void setExecutingLocally(bool execLocally) noexcept { _executingLocally = execLocally; } - const api::StorageMessage::SP& getMergeCmd() const { return _cmd; } + const api::StorageMessage::SP& getMergeCmd() const noexcept { return _cmd; } void setMergeCmd(const api::StorageMessage::SP& cmd) { _cmd = cmd; if (cmd.get()) { @@ -129,40 +132,40 @@ private: } } - bool isInCycle() const { return _inCycle; } - void setInCycle(bool inCycle) { _inCycle = inCycle; } + bool isInCycle() const noexcept { return _inCycle; } + void setInCycle(bool inCycle) noexcept { _inCycle = inCycle; } - bool isUnwinding() const { return _unwinding; } - void setUnwinding(bool unwinding) { _unwinding = unwinding; } + bool isUnwinding() const noexcept { return _unwinding; } + void setUnwinding(bool unwinding) noexcept { _unwinding = unwinding; } - bool isCycleBroken() const { return _cycleBroken; } - void setCycleBroken(bool cycleBroken) { _cycleBroken = cycleBroken; } + bool isCycleBroken() const noexcept { return _cycleBroken; } + void setCycleBroken(bool cycleBroken) noexcept { _cycleBroken = cycleBroken; } - bool isAborted() const { return _aborted; } - void setAborted(bool aborted) { _aborted = aborted; } + bool isAborted() const noexcept { return _aborted; } + void setAborted(bool aborted) noexcept { _aborted = aborted; } - const std::string& getMergeCmdString() const { return _cmdString; } + const std::string& getMergeCmdString() const noexcept { return _cmdString; } }; - typedef std::map<document::Bucket, ChainedMergeState> ActiveMergeMap; + using ActiveMergeMap = std::map<document::Bucket, ChainedMergeState>; // Use a set rather than a priority_queue, since we want to be // able to iterate over the collection during status rendering - typedef std::set< + using MergePriorityQueue = std::set< StablePriorityOrderingWrapper<api::StorageMessage::SP> - > MergePriorityQueue; + >; - enum RendezvousState { - RENDEZVOUS_NONE, - RENDEZVOUS_REQUESTED, - RENDEZVOUS_ESTABLISHED, - RENDEZVOUS_RELEASED + enum class RendezvousState { + NONE, + REQUESTED, + ESTABLISHED, + RELEASED }; ActiveMergeMap _merges; MergePriorityQueue _queue; size_t _maxQueueSize; - mbus::StaticThrottlePolicy::UP _throttlePolicy; + std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy; uint64_t _queueSequence; // TODO: move into a stable priority queue class mutable std::mutex _messageLock; std::condition_variable _messageCond; @@ -177,6 +180,7 @@ private: RendezvousState _rendezvous; mutable std::chrono::steady_clock::time_point _throttle_until_time; std::chrono::steady_clock::duration _backpressure_duration; + bool _use_dynamic_throttling; bool _disable_queue_limits_for_chained_merges; bool _closing; public: @@ -213,8 +217,8 @@ public: // For unit testing only const MergePriorityQueue& getMergeQueue() const { return _queue; } // For unit testing only - const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } - mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } + const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } + mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept; // For unit testing only std::mutex& getStateLock() { return _stateLock; } @@ -237,26 +241,26 @@ private: MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex); - const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const { + [[nodiscard]] const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const noexcept { return _sortedNodes; } - bool isIndexUnknown() const { + [[nodiscard]] bool isIndexUnknown() const noexcept { return (_sortedIndex == UINT16_MAX); } /** * This node is the merge executor if it's the first element in the * _unsorted_ node sequence. */ - bool isMergeExecutor() const { + [[nodiscard]] bool isMergeExecutor() const noexcept { return (_cmd.getNodes()[0].index == _thisIndex); } - uint16_t getExecutorNodeIndex() const{ + [[nodiscard]] uint16_t getExecutorNodeIndex() const noexcept { return _cmd.getNodes()[0].index; } - const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept { + [[nodiscard]] const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept { return _cmd.getNodes(); } - [[nodiscard]] bool isLastNode() const { + [[nodiscard]] bool isLastNode() const noexcept { if (!_use_unordered_forwarding) { return (_sortedIndex == _sortedNodes.size() - 1); } else { @@ -267,13 +271,13 @@ private: /** * Gets node to forward to in strictly increasing order. */ - uint16_t getNextNodeInChain() const; + [[nodiscard]] uint16_t getNextNodeInChain() const noexcept; /** * Returns true iff the chain vector (which is implicitly sorted) * pairwise compares equally to the vector of sorted node indices */ - bool isChainCompleted() const; + [[nodiscard]] bool isChainCompleted() const noexcept; }; /** @@ -359,7 +363,7 @@ private: [[nodiscard]] bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; [[nodiscard]] bool backpressure_mode_active_no_lock() const; void backpressure_bounce_all_queued_merges(MessageGuard& guard); - [[nodiscard]] static bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept; + [[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept; [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept; void sendReply(const api::MergeBucketCommand& cmd, |