summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-24 08:45:51 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-24 14:21:17 +0000
commitb3f3f67862dc34b753e93fb614a9fa1e84b68b3f (patch)
tree0a04d8c0c9a03cef81e3b83c72bd9a5ef8cdca49 /storage
parent418c89cd87e29b21bc3e1156203d79dfef1fe424 (diff)
Support use of dynamic throttling in MergeThrottler component
Now always uses `DynamicThrottlePolicy` instead of `StaticThrottlePolicy`, but by default hardwires the min/max limits to _functionally_ act as if a static policy were in place. If dynamic throttling is enabled, forwarded (unordered) merges are now busy-bounced instead of immediately accepted into the pending window if the window size is exhausted. This so that dynamic throttling windows on nodes earlier in the forwarding chain can be sized down implicitly (thus the _dynamic_ aspect of it all). This is done to help control resource usage and content node queue sizes. Dynamic throttling may be enabled/disabled live via config. If enabled, the following (live) config is supported: * Minimum window size * Maximum window size * Window size increment More may be added (and defaults changed) as appropriate.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp59
-rw-r--r--storage/src/vespa/storage/config/stor-server.def10
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp170
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h78
4 files changed, 182 insertions, 135 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 0f844ab6b4f..89b769078cc 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,18 +1,19 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/storageserver/mergethrottler.h>
-#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/vespalib/util/exceptions.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test {
assert(merge);
return merge;
}
+
+ [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept;
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -212,13 +215,10 @@ bool
checkChain(const StorageMessage::SP& msg,
Iterator first, Iterator end)
{
- const MergeBucketCommand& cmd =
- dynamic_cast<const MergeBucketCommand&>(*msg);
-
+ auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg);
if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) {
return false;
}
-
return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
}
@@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout)
}
+uint32_t
+MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept
+{
+ return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize());
+}
+
// Extremely simple test that just checks that (min|max)_merges_per_node
// under the stor-server config gets propagated to all the nodes
TEST_F(MergeThrottlerTest, merges_config) {
for (int i = 0; i < _storageNodeCount; ++i) {
- EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount());
+ EXPECT_EQ(25, throttler_max_merges_pending(i));
EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize());
}
}
@@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) {
TEST_F(MergeThrottlerTest, priority_queuing) {
// Fill up all active merges
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
ASSERT_GE(maxPending, 4u);
for (size_t i = 0; i < maxPending; ++i) {
@@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
// in the queue for a merge that is already known.
TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Fill up all active merges and 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) {
// order.
TEST_F(MergeThrottlerTest, forward_queued_merge) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
DummyStorageLink& bottomLink(*_bottomLinks[1]);
// Fill up all active merges and then 3 queued ones
- size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(1);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}});
@@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
TEST_F(MergeThrottlerTest, flush) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
// Second, test that we get rejected before queueing up. This is to
// avoid a hypothetical deadlock scenario.
// Fill up all active merges
- {
-
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
- for (size_t i = 0; i < maxPending; ++i) {
- auto fillCmd = std::make_shared<MergeBucketCommand>(
- makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
- _topLinks[0]->sendDown(fillCmd);
- }
+ size_t maxPending = throttler_max_merges_pending(0);
+ for (size_t i = 0; i < maxPending; ++i) {
+ auto fillCmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
+ _topLinks[0]->sendDown(fillCmd);
}
_topLinks[0]->sendDown(cmd);
@@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){
// Fill up all active merges and then 3 queued ones with the same
// system state
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// State is version 1. Send down several merges with state version 2.
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// TODO remove functionality and test
TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Fill up all active merges and then 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
}
TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) {
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
@@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
- size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}});
@@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
}
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
- size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(throttler_index);
for (size_t i = 0; i < max_pending + queued_count; ++i) {
_topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
.nodes(throttler_index, throttler_index + 1)
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index a368c2e5b6f..00be2929229 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -33,9 +33,19 @@ node_reliability int default=1 restart
## A merge operation will be chained through all nodes involved in the
## merge, only actually starting the operation when every node has
## allowed it to pass through.
+## NOTE: these config values are _not_ used if merge_throttling_policy.type
+## is configured to DYNAMIC (see below).
max_merges_per_node int default=16
max_merge_queue_size int default=100
+## Chooses the throttling policy used to control the active merge window size
+## of the MergeThrottler component.
+merge_throttling_policy.type enum { STATIC, DYNAMIC } default=STATIC
+## Only used if merge_throttling_policy.type == DYNAMIC:
+merge_throttling_policy.min_window_size int default=16
+merge_throttling_policy.max_window_size int default=128
+merge_throttling_policy.window_size_increment double default=2.0
+
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
## be bounced for this duration. Not allowing further merges helps take
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 2a30acb1a74..4b4ff6c7e37 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -5,7 +5,7 @@
#include <vespa/storage/common/dummy_mbus_messages.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/messagebus/message.h>
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
#include <vespa/messagebus/error.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -16,6 +16,8 @@
#include <vespa/log/log.h>
LOG_SETUP(".mergethrottler");
+using vespa::config::content::core::StorServerConfig;
+
namespace storage {
namespace {
@@ -126,7 +128,7 @@ MergeThrottler::MergeNodeSequence::MergeNodeSequence(const api::MergeBucketComma
}
uint16_t
-MergeThrottler::MergeNodeSequence::getNextNodeInChain() const
+MergeThrottler::MergeNodeSequence::getNextNodeInChain() const noexcept
{
assert(_cmd.getChain().size() < _sortedNodes.size());
if (_use_unordered_forwarding) {
@@ -144,7 +146,7 @@ MergeThrottler::MergeNodeSequence::getNextNodeInChain() const
}
bool
-MergeThrottler::MergeNodeSequence::isChainCompleted() const
+MergeThrottler::MergeNodeSequence::isChainCompleted() const noexcept
{
if (_cmd.getChain().size() != _sortedNodes.size()) return false;
@@ -179,7 +181,7 @@ MergeThrottler::MergeThrottler(
_merges(),
_queue(),
_maxQueueSize(1024),
- _throttlePolicy(std::make_unique<mbus::StaticThrottlePolicy>()),
+ _throttlePolicy(std::make_unique<mbus::DynamicThrottlePolicy>()),
_queueSequence(0),
_messageLock(),
_stateLock(),
@@ -187,14 +189,16 @@ MergeThrottler::MergeThrottler(
_metrics(std::make_unique<Metrics>()),
_component(compReg, "mergethrottler"),
_thread(),
- _rendezvous(RENDEZVOUS_NONE),
+ _rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
+ _use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
{
- _throttlePolicy->setMaxPendingCount(20);
- _configFetcher.subscribe<vespa::config::content::core::StorServerConfig>(configUri.getConfigId(), this);
+ _throttlePolicy->setMinWindowSize(20);
+ _throttlePolicy->setMaxWindowSize(20);
+ _configFetcher.subscribe<StorServerConfig>(configUri.getConfigId(), this);
_configFetcher.start();
_component.registerStatusPage(*this);
_component.registerMetric(*_metrics);
@@ -204,7 +208,8 @@ void
MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig)
{
std::lock_guard lock(_stateLock);
-
+ _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type
+ == StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC);
if (newConfig->maxMergesPerNode < 1) {
throw config::InvalidConfigException("Cannot have a max merge count of less than 1");
}
@@ -214,12 +219,22 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0");
}
- if (static_cast<double>(newConfig->maxMergesPerNode)
- != _throttlePolicy->getMaxPendingCount())
- {
- LOG(debug, "Setting new max pending count from max_merges_per_node: %d",
- newConfig->maxMergesPerNode);
- _throttlePolicy->setMaxPendingCount(newConfig->maxMergesPerNode);
+ if (_use_dynamic_throttling) {
+ auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1);
+ auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1);
+ if (min_win_sz > max_win_sz) {
+ min_win_sz = max_win_sz;
+ }
+ auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement);
+ _throttlePolicy->setMinWindowSize(min_win_sz);
+ _throttlePolicy->setMaxWindowSize(max_win_sz);
+ _throttlePolicy->setWindowSizeIncrement(win_sz_increment);
+ LOG(debug, "Using dynamic throttling window min/max [%d, %d], win size increment %.2g",
+ min_win_sz, max_win_sz, win_sz_increment);
+ } else {
+ // Use legacy config values when static throttling is enabled.
+ _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode);
+ _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode);
}
LOG(debug, "Setting new max queue size to %d",
newConfig->maxMergeQueueSize);
@@ -573,16 +588,16 @@ MergeThrottler::processQueuedMerges(MessageGuard& msgGuard)
void
MergeThrottler::handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
- if (_rendezvous != RENDEZVOUS_NONE) {
+ if (_rendezvous != RendezvousState::NONE) {
LOG(spam, "rendezvous requested by external thread; establishing");
- assert(_rendezvous == RENDEZVOUS_REQUESTED);
- _rendezvous = RENDEZVOUS_ESTABLISHED;
+ assert(_rendezvous == RendezvousState::REQUESTED);
+ _rendezvous = RendezvousState::ESTABLISHED;
cond.notify_all();
- while (_rendezvous != RENDEZVOUS_RELEASED) {
+ while (_rendezvous != RendezvousState::RELEASED) {
cond.wait(guard);
}
LOG(spam, "external thread rendezvous released");
- _rendezvous = RENDEZVOUS_NONE;
+ _rendezvous = RendezvousState::NONE;
cond.notify_all();
}
}
@@ -604,7 +619,7 @@ MergeThrottler::run(framework::ThreadHandle& thread)
while (_messagesDown.empty()
&& _messagesUp.empty()
&& !thread.interrupted()
- && _rendezvous == RENDEZVOUS_NONE)
+ && _rendezvous == RendezvousState::NONE)
{
_messageCond.wait_for(msgLock, 1000ms);
thread.registerTick(framework::WAIT_CYCLE);
@@ -683,10 +698,20 @@ bool MergeThrottler::backpressure_mode_active() const {
return backpressure_mode_active_no_lock();
}
-bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept {
+bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept {
// We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
// See comment in may_allow_into_queue() for rationale.
- return (cmd.use_unordered_forwarding() && !cmd.from_distributor());
+ if (!cmd.use_unordered_forwarding() || cmd.from_distributor()) {
+ return false;
+ }
+ // We'll only get here if we're dealing with an unordered merge that has been forwarded
+ // from another content node. In other words, it's a merge we want to handle immediately
+ // instead of deferring in the queue for later processing. We already know that the merge
+ // window is full, so we must either allow it in regardless or bounce it back. The latter
+ // makes the most sense when dynamic throttling is enabled, as NACKed replies count
+ // _against_ incrementing the throttling window, thereby implicitly helping to reduce the
+ // merge pressure generated by other nodes.
+ return !_use_dynamic_throttling;
}
bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
@@ -1155,10 +1180,10 @@ void
MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
LOG(spam, "establishing rendezvous with worker thread");
- assert(_rendezvous == RENDEZVOUS_NONE);
- _rendezvous = RENDEZVOUS_REQUESTED;
+ assert(_rendezvous == RendezvousState::NONE);
+ _rendezvous = RendezvousState::REQUESTED;
cond.notify_all();
- while (_rendezvous != RENDEZVOUS_ESTABLISHED) {
+ while (_rendezvous != RendezvousState::ESTABLISHED) {
cond.wait(guard);
}
LOG(spam, "rendezvous established with worker thread");
@@ -1167,9 +1192,9 @@ MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard,
void
MergeThrottler::releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
- _rendezvous = RENDEZVOUS_RELEASED;
+ _rendezvous = RendezvousState::RELEASED;
cond.notify_all();
- while (_rendezvous != RENDEZVOUS_NONE) {
+ while (_rendezvous != RendezvousState::NONE) {
cond.wait(guard);
}
}
@@ -1288,57 +1313,62 @@ MergeThrottler::reportHtmlStatus(std::ostream& out,
const framework::HttpUrlPath&) const
{
std::lock_guard lock(_stateLock);
- {
- out << "<p>Max pending: "
+ if (_use_dynamic_throttling) {
+ out << "<p>Dynamic throttle policy; window size min/max: ["
+ << _throttlePolicy->getMinWindowSize() << ", "
+ << _throttlePolicy->getMaxWindowSize()
+ << "], current window size: "
+ << _throttlePolicy->getMaxPendingCount()
+ << "</p>\n";
+ } else {
+ out << "<p>Static throttle policy; max pending: "
<< _throttlePolicy->getMaxPendingCount()
<< "</p>\n";
- out << "<p>Please see node metrics for performance numbers</p>\n";
- out << "<h3>Active merges ("
- << _merges.size()
- << ")</h3>\n";
- if (!_merges.empty()) {
- out << "<ul>\n";
- for (auto& m : _merges) {
- out << "<li>" << m.second.getMergeCmdString();
- if (m.second.isExecutingLocally()) {
- out << " <strong>(";
- if (m.second.isInCycle()) {
- out << "cycled - ";
- } else if (m.second.isCycleBroken()) {
- out << "broken cycle (another node in the chain likely went down) - ";
- }
- out << "executing on this node)</strong>";
- } else if (m.second.isUnwinding()) {
- out << " <strong>(was executed here, now unwinding)</strong>";
- }
- if (m.second.isAborted()) {
- out << " <strong>aborted</strong>";
+ }
+ out << "<p>Please see node metrics for performance numbers</p>\n";
+ out << "<h3>Active merges ("
+ << _merges.size()
+ << ")</h3>\n";
+ if (!_merges.empty()) {
+ out << "<ul>\n";
+ for (auto& m : _merges) {
+ out << "<li>" << m.second.getMergeCmdString();
+ if (m.second.isExecutingLocally()) {
+ out << " <strong>(";
+ if (m.second.isInCycle()) {
+ out << "cycled - ";
+ } else if (m.second.isCycleBroken()) {
+ out << "broken cycle (another node in the chain likely went down) - ";
}
- out << "</li>\n";
+ out << "executing on this node)</strong>";
+ } else if (m.second.isUnwinding()) {
+ out << " <strong>(was executed here, now unwinding)</strong>";
}
- out << "</ul>\n";
- } else {
- out << "<p>None</p>\n";
+ if (m.second.isAborted()) {
+ out << " <strong>aborted</strong>";
+ }
+ out << "</li>\n";
}
+ out << "</ul>\n";
+ } else {
+ out << "<p>None</p>\n";
}
- {
- out << "<h3>Queued merges (in priority order) ("
- << _queue.size()
- << ")</h3>\n";
- if (!_queue.empty()) {
- out << "<ol>\n";
- for (auto& qm : _queue) {
- // The queue always owns its messages, thus this is safe
- out << "<li>Pri "
- << static_cast<unsigned int>(qm._msg->getPriority())
- << ": " << *qm._msg;
- out << "</li>\n";
- }
- out << "</ol>\n";
- } else {
- out << "<p>None</p>\n";
+ out << "<h3>Queued merges (in priority order) ("
+ << _queue.size()
+ << ")</h3>\n";
+ if (!_queue.empty()) {
+ out << "<ol>\n";
+ for (auto& qm : _queue) {
+ // The queue always owns its messages, thus this is safe
+ out << "<li>Pri "
+ << static_cast<unsigned int>(qm._msg->getPriority())
+ << ": " << *qm._msg;
+ out << "</li>\n";
}
+ out << "</ol>\n";
+ } else {
+ out << "<p>None</p>\n";
}
}
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index c115d36ad89..b953595c91e 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -15,7 +15,6 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/summetric.h>
#include <vespa/metrics/countmetric.h>
@@ -24,6 +23,10 @@
#include <vespa/config/config.h>
#include <chrono>
+namespace mbus {
+class DynamicThrottlePolicy;
+}
+
namespace storage {
class AbortBucketOperationsCommand;
@@ -47,7 +50,7 @@ public:
metrics::LongCountMetric rejected;
metrics::LongCountMetric other;
- MergeFailureMetrics(metrics::MetricSet* owner);
+ explicit MergeFailureMetrics(metrics::MetricSet* owner);
~MergeFailureMetrics() override;
};
@@ -69,8 +72,8 @@ public:
MergeOperationMetrics chaining;
MergeOperationMetrics local;
- Metrics(metrics::MetricSet* owner = 0);
- ~Metrics();
+ explicit Metrics(metrics::MetricSet* owner = nullptr);
+ ~Metrics() override;
};
private:
@@ -114,14 +117,14 @@ private:
bool _aborted;
ChainedMergeState();
- ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false);
+ explicit ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false);
~ChainedMergeState();
// Use default copy-constructor/assignment operator
- bool isExecutingLocally() const { return _executingLocally; }
- void setExecutingLocally(bool execLocally) { _executingLocally = execLocally; }
+ bool isExecutingLocally() const noexcept { return _executingLocally; }
+ void setExecutingLocally(bool execLocally) noexcept { _executingLocally = execLocally; }
- const api::StorageMessage::SP& getMergeCmd() const { return _cmd; }
+ const api::StorageMessage::SP& getMergeCmd() const noexcept { return _cmd; }
void setMergeCmd(const api::StorageMessage::SP& cmd) {
_cmd = cmd;
if (cmd.get()) {
@@ -129,40 +132,40 @@ private:
}
}
- bool isInCycle() const { return _inCycle; }
- void setInCycle(bool inCycle) { _inCycle = inCycle; }
+ bool isInCycle() const noexcept { return _inCycle; }
+ void setInCycle(bool inCycle) noexcept { _inCycle = inCycle; }
- bool isUnwinding() const { return _unwinding; }
- void setUnwinding(bool unwinding) { _unwinding = unwinding; }
+ bool isUnwinding() const noexcept { return _unwinding; }
+ void setUnwinding(bool unwinding) noexcept { _unwinding = unwinding; }
- bool isCycleBroken() const { return _cycleBroken; }
- void setCycleBroken(bool cycleBroken) { _cycleBroken = cycleBroken; }
+ bool isCycleBroken() const noexcept { return _cycleBroken; }
+ void setCycleBroken(bool cycleBroken) noexcept { _cycleBroken = cycleBroken; }
- bool isAborted() const { return _aborted; }
- void setAborted(bool aborted) { _aborted = aborted; }
+ bool isAborted() const noexcept { return _aborted; }
+ void setAborted(bool aborted) noexcept { _aborted = aborted; }
- const std::string& getMergeCmdString() const { return _cmdString; }
+ const std::string& getMergeCmdString() const noexcept { return _cmdString; }
};
- typedef std::map<document::Bucket, ChainedMergeState> ActiveMergeMap;
+ using ActiveMergeMap = std::map<document::Bucket, ChainedMergeState>;
// Use a set rather than a priority_queue, since we want to be
// able to iterate over the collection during status rendering
- typedef std::set<
+ using MergePriorityQueue = std::set<
StablePriorityOrderingWrapper<api::StorageMessage::SP>
- > MergePriorityQueue;
+ >;
- enum RendezvousState {
- RENDEZVOUS_NONE,
- RENDEZVOUS_REQUESTED,
- RENDEZVOUS_ESTABLISHED,
- RENDEZVOUS_RELEASED
+ enum class RendezvousState {
+ NONE,
+ REQUESTED,
+ ESTABLISHED,
+ RELEASED
};
ActiveMergeMap _merges;
MergePriorityQueue _queue;
size_t _maxQueueSize;
- mbus::StaticThrottlePolicy::UP _throttlePolicy;
+ std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
uint64_t _queueSequence; // TODO: move into a stable priority queue class
mutable std::mutex _messageLock;
std::condition_variable _messageCond;
@@ -177,6 +180,7 @@ private:
RendezvousState _rendezvous;
mutable std::chrono::steady_clock::time_point _throttle_until_time;
std::chrono::steady_clock::duration _backpressure_duration;
+ bool _use_dynamic_throttling;
bool _disable_queue_limits_for_chained_merges;
bool _closing;
public:
@@ -213,8 +217,8 @@ public:
// For unit testing only
const MergePriorityQueue& getMergeQueue() const { return _queue; }
// For unit testing only
- const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
- mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
+ const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
+ mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -237,26 +241,26 @@ private:
MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex);
- const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const {
+ [[nodiscard]] const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const noexcept {
return _sortedNodes;
}
- bool isIndexUnknown() const {
+ [[nodiscard]] bool isIndexUnknown() const noexcept {
return (_sortedIndex == UINT16_MAX);
}
/**
* This node is the merge executor if it's the first element in the
* _unsorted_ node sequence.
*/
- bool isMergeExecutor() const {
+ [[nodiscard]] bool isMergeExecutor() const noexcept {
return (_cmd.getNodes()[0].index == _thisIndex);
}
- uint16_t getExecutorNodeIndex() const{
+ [[nodiscard]] uint16_t getExecutorNodeIndex() const noexcept {
return _cmd.getNodes()[0].index;
}
- const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept {
+ [[nodiscard]] const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept {
return _cmd.getNodes();
}
- [[nodiscard]] bool isLastNode() const {
+ [[nodiscard]] bool isLastNode() const noexcept {
if (!_use_unordered_forwarding) {
return (_sortedIndex == _sortedNodes.size() - 1);
} else {
@@ -267,13 +271,13 @@ private:
/**
* Gets node to forward to in strictly increasing order.
*/
- uint16_t getNextNodeInChain() const;
+ [[nodiscard]] uint16_t getNextNodeInChain() const noexcept;
/**
* Returns true iff the chain vector (which is implicitly sorted)
* pairwise compares equally to the vector of sorted node indices
*/
- bool isChainCompleted() const;
+ [[nodiscard]] bool isChainCompleted() const noexcept;
};
/**
@@ -359,7 +363,7 @@ private:
[[nodiscard]] bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
[[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
- [[nodiscard]] static bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept;
+ [[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,