summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp59
-rw-r--r--storage/src/vespa/storage/config/stor-server.def10
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp170
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h78
4 files changed, 182 insertions, 135 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 0f844ab6b4f..89b769078cc 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,18 +1,19 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/storageserver/mergethrottler.h>
-#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/vespalib/util/exceptions.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -159,6 +160,8 @@ struct MergeThrottlerTest : Test {
assert(merge);
return merge;
}
+
+ [[nodiscard]] uint32_t throttler_max_merges_pending(uint16_t throttler_index) const noexcept;
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -212,13 +215,10 @@ bool
checkChain(const StorageMessage::SP& msg,
Iterator first, Iterator end)
{
- const MergeBucketCommand& cmd =
- dynamic_cast<const MergeBucketCommand&>(*msg);
-
+ auto& cmd = dynamic_cast<const MergeBucketCommand&>(*msg);
if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) {
return false;
}
-
return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
}
@@ -247,11 +247,17 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout)
}
+uint32_t
+MergeThrottlerTest::throttler_max_merges_pending(uint16_t throttler_index) const noexcept
+{
+ return static_cast<uint32_t>(_throttlers[throttler_index]->getThrottlePolicy().getMaxWindowSize());
+}
+
// Extremely simple test that just checks that (min|max)_merges_per_node
// under the stor-server config gets propagated to all the nodes
TEST_F(MergeThrottlerTest, merges_config) {
for (int i = 0; i < _storageNodeCount; ++i) {
- EXPECT_EQ(25, _throttlers[i]->getThrottlePolicy().getMaxPendingCount());
+ EXPECT_EQ(25, throttler_max_merges_pending(i));
EXPECT_EQ(20, _throttlers[i]->getMaxQueueSize());
}
}
@@ -636,7 +642,7 @@ TEST_F(MergeThrottlerTest, resend_handling) {
TEST_F(MergeThrottlerTest, priority_queuing) {
// Fill up all active merges
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
ASSERT_GE(maxPending, 4u);
for (size_t i = 0; i < maxPending; ++i) {
@@ -691,7 +697,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
// in the queue for a merge that is already known.
TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Fill up all active merges and 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -786,7 +792,7 @@ TEST_F(MergeThrottlerTest, invalid_receiver_node) {
// order.
TEST_F(MergeThrottlerTest, forward_queued_merge) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
@@ -846,7 +852,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
DummyStorageLink& bottomLink(*_bottomLinks[1]);
// Fill up all active merges and then 3 queued ones
- size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(1);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}});
@@ -914,7 +920,7 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
TEST_F(MergeThrottlerTest, flush) {
// Fill up all active merges and then 3 queued ones
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 3; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -972,14 +978,11 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
// Second, test that we get rejected before queueing up. This is to
// avoid a hypothetical deadlock scenario.
// Fill up all active merges
- {
-
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
- for (size_t i = 0; i < maxPending; ++i) {
- auto fillCmd = std::make_shared<MergeBucketCommand>(
- makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
- _topLinks[0]->sendDown(fillCmd);
- }
+ size_t maxPending = throttler_max_merges_pending(0);
+ for (size_t i = 0; i < maxPending; ++i) {
+ auto fillCmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
+ _topLinks[0]->sendDown(fillCmd);
}
_topLinks[0]->sendDown(cmd);
@@ -993,7 +996,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queued){
// Fill up all active merges and then 3 queued ones with the same
// system state
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1035,7 +1038,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// State is version 1. Send down several merges with state version 2.
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
for (size_t i = 0; i < maxPending + 3; ++i) {
@@ -1074,7 +1077,7 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
// TODO remove functionality and test
TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Fill up all active merges and then 1 queued one
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
ASSERT_LT(maxPending, 100);
for (size_t i = 0; i < maxPending + 1; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
@@ -1156,7 +1159,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
}
TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) {
- size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = throttler_max_merges_pending(0);
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
@@ -1205,7 +1208,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
- size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}});
@@ -1452,7 +1455,7 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
}
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
- size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
+ size_t max_pending = throttler_max_merges_pending(throttler_index);
for (size_t i = 0; i < max_pending + queued_count; ++i) {
_topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
.nodes(throttler_index, throttler_index + 1)
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index a368c2e5b6f..00be2929229 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -33,9 +33,19 @@ node_reliability int default=1 restart
## A merge operation will be chained through all nodes involved in the
## merge, only actually starting the operation when every node has
## allowed it to pass through.
+## NOTE: these config values are _not_ used if merge_throttling_policy.type
+## is configured to DYNAMIC (see below).
max_merges_per_node int default=16
max_merge_queue_size int default=100
+## Chooses the throttling policy used to control the active merge window size
+## of the MergeThrottler component.
+merge_throttling_policy.type enum { STATIC, DYNAMIC } default=STATIC
+## Only used if merge_throttling_policy.type == DYNAMIC:
+merge_throttling_policy.min_window_size int default=16
+merge_throttling_policy.max_window_size int default=128
+merge_throttling_policy.window_size_increment double default=2.0
+
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
## be bounced for this duration. Not allowing further merges helps take
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 2a30acb1a74..4b4ff6c7e37 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -5,7 +5,7 @@
#include <vespa/storage/common/dummy_mbus_messages.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/messagebus/message.h>
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
#include <vespa/messagebus/error.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -16,6 +16,8 @@
#include <vespa/log/log.h>
LOG_SETUP(".mergethrottler");
+using vespa::config::content::core::StorServerConfig;
+
namespace storage {
namespace {
@@ -126,7 +128,7 @@ MergeThrottler::MergeNodeSequence::MergeNodeSequence(const api::MergeBucketComma
}
uint16_t
-MergeThrottler::MergeNodeSequence::getNextNodeInChain() const
+MergeThrottler::MergeNodeSequence::getNextNodeInChain() const noexcept
{
assert(_cmd.getChain().size() < _sortedNodes.size());
if (_use_unordered_forwarding) {
@@ -144,7 +146,7 @@ MergeThrottler::MergeNodeSequence::getNextNodeInChain() const
}
bool
-MergeThrottler::MergeNodeSequence::isChainCompleted() const
+MergeThrottler::MergeNodeSequence::isChainCompleted() const noexcept
{
if (_cmd.getChain().size() != _sortedNodes.size()) return false;
@@ -179,7 +181,7 @@ MergeThrottler::MergeThrottler(
_merges(),
_queue(),
_maxQueueSize(1024),
- _throttlePolicy(std::make_unique<mbus::StaticThrottlePolicy>()),
+ _throttlePolicy(std::make_unique<mbus::DynamicThrottlePolicy>()),
_queueSequence(0),
_messageLock(),
_stateLock(),
@@ -187,14 +189,16 @@ MergeThrottler::MergeThrottler(
_metrics(std::make_unique<Metrics>()),
_component(compReg, "mergethrottler"),
_thread(),
- _rendezvous(RENDEZVOUS_NONE),
+ _rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
+ _use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
{
- _throttlePolicy->setMaxPendingCount(20);
- _configFetcher.subscribe<vespa::config::content::core::StorServerConfig>(configUri.getConfigId(), this);
+ _throttlePolicy->setMinWindowSize(20);
+ _throttlePolicy->setMaxWindowSize(20);
+ _configFetcher.subscribe<StorServerConfig>(configUri.getConfigId(), this);
_configFetcher.start();
_component.registerStatusPage(*this);
_component.registerMetric(*_metrics);
@@ -204,7 +208,8 @@ void
MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig)
{
std::lock_guard lock(_stateLock);
-
+ _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type
+ == StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC);
if (newConfig->maxMergesPerNode < 1) {
throw config::InvalidConfigException("Cannot have a max merge count of less than 1");
}
@@ -214,12 +219,22 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0");
}
- if (static_cast<double>(newConfig->maxMergesPerNode)
- != _throttlePolicy->getMaxPendingCount())
- {
- LOG(debug, "Setting new max pending count from max_merges_per_node: %d",
- newConfig->maxMergesPerNode);
- _throttlePolicy->setMaxPendingCount(newConfig->maxMergesPerNode);
+ if (_use_dynamic_throttling) {
+ auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1);
+ auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1);
+ if (min_win_sz > max_win_sz) {
+ min_win_sz = max_win_sz;
+ }
+ auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement);
+ _throttlePolicy->setMinWindowSize(min_win_sz);
+ _throttlePolicy->setMaxWindowSize(max_win_sz);
+ _throttlePolicy->setWindowSizeIncrement(win_sz_increment);
+ LOG(debug, "Using dynamic throttling window min/max [%d, %d], win size increment %.2g",
+ min_win_sz, max_win_sz, win_sz_increment);
+ } else {
+ // Use legacy config values when static throttling is enabled.
+ _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode);
+ _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode);
}
LOG(debug, "Setting new max queue size to %d",
newConfig->maxMergeQueueSize);
@@ -573,16 +588,16 @@ MergeThrottler::processQueuedMerges(MessageGuard& msgGuard)
void
MergeThrottler::handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
- if (_rendezvous != RENDEZVOUS_NONE) {
+ if (_rendezvous != RendezvousState::NONE) {
LOG(spam, "rendezvous requested by external thread; establishing");
- assert(_rendezvous == RENDEZVOUS_REQUESTED);
- _rendezvous = RENDEZVOUS_ESTABLISHED;
+ assert(_rendezvous == RendezvousState::REQUESTED);
+ _rendezvous = RendezvousState::ESTABLISHED;
cond.notify_all();
- while (_rendezvous != RENDEZVOUS_RELEASED) {
+ while (_rendezvous != RendezvousState::RELEASED) {
cond.wait(guard);
}
LOG(spam, "external thread rendezvous released");
- _rendezvous = RENDEZVOUS_NONE;
+ _rendezvous = RendezvousState::NONE;
cond.notify_all();
}
}
@@ -604,7 +619,7 @@ MergeThrottler::run(framework::ThreadHandle& thread)
while (_messagesDown.empty()
&& _messagesUp.empty()
&& !thread.interrupted()
- && _rendezvous == RENDEZVOUS_NONE)
+ && _rendezvous == RendezvousState::NONE)
{
_messageCond.wait_for(msgLock, 1000ms);
thread.registerTick(framework::WAIT_CYCLE);
@@ -683,10 +698,20 @@ bool MergeThrottler::backpressure_mode_active() const {
return backpressure_mode_active_no_lock();
}
-bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept {
+bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept {
// We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
// See comment in may_allow_into_queue() for rationale.
- return (cmd.use_unordered_forwarding() && !cmd.from_distributor());
+ if (!cmd.use_unordered_forwarding() || cmd.from_distributor()) {
+ return false;
+ }
+ // We'll only get here if we're dealing with an unordered merge that has been forwarded
+ // from another content node. In other words, it's a merge we want to handle immediately
+ // instead of deferring in the queue for later processing. We already know that the merge
+ // window is full, so we must either allow it in regardless or bounce it back. The latter
+ // makes the most sense when dynamic throttling is enabled, as NACKed replies count
+ // _against_ incrementing the throttling window, thereby implicitly helping to reduce the
+ // merge pressure generated by other nodes.
+ return !_use_dynamic_throttling;
}
bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
@@ -1155,10 +1180,10 @@ void
MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
LOG(spam, "establishing rendezvous with worker thread");
- assert(_rendezvous == RENDEZVOUS_NONE);
- _rendezvous = RENDEZVOUS_REQUESTED;
+ assert(_rendezvous == RendezvousState::NONE);
+ _rendezvous = RendezvousState::REQUESTED;
cond.notify_all();
- while (_rendezvous != RENDEZVOUS_ESTABLISHED) {
+ while (_rendezvous != RendezvousState::ESTABLISHED) {
cond.wait(guard);
}
LOG(spam, "rendezvous established with worker thread");
@@ -1167,9 +1192,9 @@ MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard,
void
MergeThrottler::releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
- _rendezvous = RENDEZVOUS_RELEASED;
+ _rendezvous = RendezvousState::RELEASED;
cond.notify_all();
- while (_rendezvous != RENDEZVOUS_NONE) {
+ while (_rendezvous != RendezvousState::NONE) {
cond.wait(guard);
}
}
@@ -1288,57 +1313,62 @@ MergeThrottler::reportHtmlStatus(std::ostream& out,
const framework::HttpUrlPath&) const
{
std::lock_guard lock(_stateLock);
- {
- out << "<p>Max pending: "
+ if (_use_dynamic_throttling) {
+ out << "<p>Dynamic throttle policy; window size min/max: ["
+ << _throttlePolicy->getMinWindowSize() << ", "
+ << _throttlePolicy->getMaxWindowSize()
+ << "], current window size: "
+ << _throttlePolicy->getMaxPendingCount()
+ << "</p>\n";
+ } else {
+ out << "<p>Static throttle policy; max pending: "
<< _throttlePolicy->getMaxPendingCount()
<< "</p>\n";
- out << "<p>Please see node metrics for performance numbers</p>\n";
- out << "<h3>Active merges ("
- << _merges.size()
- << ")</h3>\n";
- if (!_merges.empty()) {
- out << "<ul>\n";
- for (auto& m : _merges) {
- out << "<li>" << m.second.getMergeCmdString();
- if (m.second.isExecutingLocally()) {
- out << " <strong>(";
- if (m.second.isInCycle()) {
- out << "cycled - ";
- } else if (m.second.isCycleBroken()) {
- out << "broken cycle (another node in the chain likely went down) - ";
- }
- out << "executing on this node)</strong>";
- } else if (m.second.isUnwinding()) {
- out << " <strong>(was executed here, now unwinding)</strong>";
- }
- if (m.second.isAborted()) {
- out << " <strong>aborted</strong>";
+ }
+ out << "<p>Please see node metrics for performance numbers</p>\n";
+ out << "<h3>Active merges ("
+ << _merges.size()
+ << ")</h3>\n";
+ if (!_merges.empty()) {
+ out << "<ul>\n";
+ for (auto& m : _merges) {
+ out << "<li>" << m.second.getMergeCmdString();
+ if (m.second.isExecutingLocally()) {
+ out << " <strong>(";
+ if (m.second.isInCycle()) {
+ out << "cycled - ";
+ } else if (m.second.isCycleBroken()) {
+ out << "broken cycle (another node in the chain likely went down) - ";
}
- out << "</li>\n";
+ out << "executing on this node)</strong>";
+ } else if (m.second.isUnwinding()) {
+ out << " <strong>(was executed here, now unwinding)</strong>";
}
- out << "</ul>\n";
- } else {
- out << "<p>None</p>\n";
+ if (m.second.isAborted()) {
+ out << " <strong>aborted</strong>";
+ }
+ out << "</li>\n";
}
+ out << "</ul>\n";
+ } else {
+ out << "<p>None</p>\n";
}
- {
- out << "<h3>Queued merges (in priority order) ("
- << _queue.size()
- << ")</h3>\n";
- if (!_queue.empty()) {
- out << "<ol>\n";
- for (auto& qm : _queue) {
- // The queue always owns its messages, thus this is safe
- out << "<li>Pri "
- << static_cast<unsigned int>(qm._msg->getPriority())
- << ": " << *qm._msg;
- out << "</li>\n";
- }
- out << "</ol>\n";
- } else {
- out << "<p>None</p>\n";
+ out << "<h3>Queued merges (in priority order) ("
+ << _queue.size()
+ << ")</h3>\n";
+ if (!_queue.empty()) {
+ out << "<ol>\n";
+ for (auto& qm : _queue) {
+ // The queue always owns its messages, thus this is safe
+ out << "<li>Pri "
+ << static_cast<unsigned int>(qm._msg->getPriority())
+ << ": " << *qm._msg;
+ out << "</li>\n";
}
+ out << "</ol>\n";
+ } else {
+ out << "<p>None</p>\n";
}
}
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index c115d36ad89..b953595c91e 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -15,7 +15,6 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/summetric.h>
#include <vespa/metrics/countmetric.h>
@@ -24,6 +23,10 @@
#include <vespa/config/config.h>
#include <chrono>
+namespace mbus {
+class DynamicThrottlePolicy;
+}
+
namespace storage {
class AbortBucketOperationsCommand;
@@ -47,7 +50,7 @@ public:
metrics::LongCountMetric rejected;
metrics::LongCountMetric other;
- MergeFailureMetrics(metrics::MetricSet* owner);
+ explicit MergeFailureMetrics(metrics::MetricSet* owner);
~MergeFailureMetrics() override;
};
@@ -69,8 +72,8 @@ public:
MergeOperationMetrics chaining;
MergeOperationMetrics local;
- Metrics(metrics::MetricSet* owner = 0);
- ~Metrics();
+ explicit Metrics(metrics::MetricSet* owner = nullptr);
+ ~Metrics() override;
};
private:
@@ -114,14 +117,14 @@ private:
bool _aborted;
ChainedMergeState();
- ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false);
+ explicit ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false);
~ChainedMergeState();
// Use default copy-constructor/assignment operator
- bool isExecutingLocally() const { return _executingLocally; }
- void setExecutingLocally(bool execLocally) { _executingLocally = execLocally; }
+ bool isExecutingLocally() const noexcept { return _executingLocally; }
+ void setExecutingLocally(bool execLocally) noexcept { _executingLocally = execLocally; }
- const api::StorageMessage::SP& getMergeCmd() const { return _cmd; }
+ const api::StorageMessage::SP& getMergeCmd() const noexcept { return _cmd; }
void setMergeCmd(const api::StorageMessage::SP& cmd) {
_cmd = cmd;
if (cmd.get()) {
@@ -129,40 +132,40 @@ private:
}
}
- bool isInCycle() const { return _inCycle; }
- void setInCycle(bool inCycle) { _inCycle = inCycle; }
+ bool isInCycle() const noexcept { return _inCycle; }
+ void setInCycle(bool inCycle) noexcept { _inCycle = inCycle; }
- bool isUnwinding() const { return _unwinding; }
- void setUnwinding(bool unwinding) { _unwinding = unwinding; }
+ bool isUnwinding() const noexcept { return _unwinding; }
+ void setUnwinding(bool unwinding) noexcept { _unwinding = unwinding; }
- bool isCycleBroken() const { return _cycleBroken; }
- void setCycleBroken(bool cycleBroken) { _cycleBroken = cycleBroken; }
+ bool isCycleBroken() const noexcept { return _cycleBroken; }
+ void setCycleBroken(bool cycleBroken) noexcept { _cycleBroken = cycleBroken; }
- bool isAborted() const { return _aborted; }
- void setAborted(bool aborted) { _aborted = aborted; }
+ bool isAborted() const noexcept { return _aborted; }
+ void setAborted(bool aborted) noexcept { _aborted = aborted; }
- const std::string& getMergeCmdString() const { return _cmdString; }
+ const std::string& getMergeCmdString() const noexcept { return _cmdString; }
};
- typedef std::map<document::Bucket, ChainedMergeState> ActiveMergeMap;
+ using ActiveMergeMap = std::map<document::Bucket, ChainedMergeState>;
// Use a set rather than a priority_queue, since we want to be
// able to iterate over the collection during status rendering
- typedef std::set<
+ using MergePriorityQueue = std::set<
StablePriorityOrderingWrapper<api::StorageMessage::SP>
- > MergePriorityQueue;
+ >;
- enum RendezvousState {
- RENDEZVOUS_NONE,
- RENDEZVOUS_REQUESTED,
- RENDEZVOUS_ESTABLISHED,
- RENDEZVOUS_RELEASED
+ enum class RendezvousState {
+ NONE,
+ REQUESTED,
+ ESTABLISHED,
+ RELEASED
};
ActiveMergeMap _merges;
MergePriorityQueue _queue;
size_t _maxQueueSize;
- mbus::StaticThrottlePolicy::UP _throttlePolicy;
+ std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
uint64_t _queueSequence; // TODO: move into a stable priority queue class
mutable std::mutex _messageLock;
std::condition_variable _messageCond;
@@ -177,6 +180,7 @@ private:
RendezvousState _rendezvous;
mutable std::chrono::steady_clock::time_point _throttle_until_time;
std::chrono::steady_clock::duration _backpressure_duration;
+ bool _use_dynamic_throttling;
bool _disable_queue_limits_for_chained_merges;
bool _closing;
public:
@@ -213,8 +217,8 @@ public:
// For unit testing only
const MergePriorityQueue& getMergeQueue() const { return _queue; }
// For unit testing only
- const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
- mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
+ const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
+ mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -237,26 +241,26 @@ private:
MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex);
- const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const {
+ [[nodiscard]] const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const noexcept {
return _sortedNodes;
}
- bool isIndexUnknown() const {
+ [[nodiscard]] bool isIndexUnknown() const noexcept {
return (_sortedIndex == UINT16_MAX);
}
/**
* This node is the merge executor if it's the first element in the
* _unsorted_ node sequence.
*/
- bool isMergeExecutor() const {
+ [[nodiscard]] bool isMergeExecutor() const noexcept {
return (_cmd.getNodes()[0].index == _thisIndex);
}
- uint16_t getExecutorNodeIndex() const{
+ [[nodiscard]] uint16_t getExecutorNodeIndex() const noexcept {
return _cmd.getNodes()[0].index;
}
- const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept {
+ [[nodiscard]] const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept {
return _cmd.getNodes();
}
- [[nodiscard]] bool isLastNode() const {
+ [[nodiscard]] bool isLastNode() const noexcept {
if (!_use_unordered_forwarding) {
return (_sortedIndex == _sortedNodes.size() - 1);
} else {
@@ -267,13 +271,13 @@ private:
/**
* Gets node to forward to in strictly increasing order.
*/
- uint16_t getNextNodeInChain() const;
+ [[nodiscard]] uint16_t getNextNodeInChain() const noexcept;
/**
* Returns true iff the chain vector (which is implicitly sorted)
* pairwise compares equally to the vector of sorted node indices
*/
- bool isChainCompleted() const;
+ [[nodiscard]] bool isChainCompleted() const noexcept;
};
/**
@@ -359,7 +363,7 @@ private:
[[nodiscard]] bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
[[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
- [[nodiscard]] static bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept;
+ [[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,