summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-30 13:49:05 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-30 13:49:05 +0000
commit605a60c9a2c95d7a67e674995a455c5c5f7f3545 (patch)
treecf796cdd42f829a764fa3bcc3e2866047eebe2f2 /storage
parent75e055d4bb57d3323038fca456b8ec963d1994f9 (diff)
Add backpressure to MergeThrottler with configurable timing
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/testhelper.cpp1
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp61
-rw-r--r--storage/src/vespa/storage/config/stor-server.def9
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp80
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h35
5 files changed, 131 insertions, 55 deletions
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp
index fa85f4c6a45..296a9a3cc0f 100644
--- a/storage/src/tests/common/testhelper.cpp
+++ b/storage/src/tests/common/testhelper.cpp
@@ -150,6 +150,7 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro
config->set("enable_dead_lock_detector_warnings", "false");
config->set("max_merges_per_node", "25");
config->set("max_merge_queue_size", "20");
+ config->set("resource_exhaustion_merge_back_pressure_duration_secs", "15.0");
vespalib::string rootFolder = rootOfRoot + "_";
rootFolder += (storagenode ? "vdsroot" : "vdsroot.distributor");
config->set("root_folder", rootFolder);
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 559d54d3620..ec16b76bf9a 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,10 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <cppunit/extensions/HelperMacros.h>
-#include <memory>
-#include <iterator>
-#include <vector>
-#include <algorithm>
-#include <ctime>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
@@ -16,6 +11,12 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <memory>
+#include <iterator>
+#include <vector>
+#include <algorithm>
+#include <chrono>
+#include <thread>
using namespace document;
using namespace storage::api;
@@ -24,8 +25,7 @@ namespace storage {
namespace {
-struct MergeBuilder
-{
+struct MergeBuilder {
document::BucketId _bucket;
api::Timestamp _maxTimestamp;
std::vector<uint16_t> _nodes;
@@ -41,6 +41,8 @@ struct MergeBuilder
nodes(0, 1, 2);
}
+ ~MergeBuilder();
+
MergeBuilder& nodes(uint16_t n0) {
_nodes.push_back(n0);
return *this;
@@ -97,6 +99,8 @@ struct MergeBuilder
}
};
+MergeBuilder::~MergeBuilder() {}
+
std::shared_ptr<api::SetSystemStateCommand>
makeSystemStateCmd(const std::string& state)
{
@@ -106,8 +110,7 @@ makeSystemStateCmd(const std::string& state)
} // anon ns
-class MergeThrottlerTest : public CppUnit::TestFixture
-{
+class MergeThrottlerTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(MergeThrottlerTest);
CPPUNIT_TEST(testMergesConfig);
CPPUNIT_TEST(testChain);
@@ -133,6 +136,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture
CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
+ CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() override;
@@ -162,6 +166,7 @@ public:
void testGetBucketDiffCommandNotInActiveSetIsRejected();
void testApplyBucketDiffCommandNotInActiveSetIsRejected();
void testNewClusterStateAbortsAllOutdatedActiveMerges();
+ void backpressure_busy_bounces_merges_for_configured_duration();
private:
static const int _storageNodeCount = 3;
static const int _messageWaitTime = 100;
@@ -247,7 +252,7 @@ checkChain(const StorageMessage::SP& msg,
void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout)
{
- std::time_t start = std::time(0);
+ const auto start = std::chrono::steady_clock::now();
while (true) {
std::size_t count;
{
@@ -257,14 +262,14 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeou
if (count == sz) {
break;
}
- std::time_t now = std::time(0);
- if (now - start > timeout) {
+ auto now = std::chrono::steady_clock::now();
+ if (now - start > std::chrono::seconds(timeout)) {
std::ostringstream os;
os << "Timeout while waiting for merge queue with " << sz << " items. Had "
<< count << " at timeout.";
throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC);
}
- FastOS_Thread::Sleep(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
@@ -1491,8 +1496,7 @@ MergeThrottlerTest::sendAndExpectReply(
_topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime);
StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage(
expectedReplyType));
- api::StorageReply& storageReply(
- dynamic_cast<api::StorageReply&>(*reply));
+ auto& storageReply = dynamic_cast<api::StorageReply&>(*reply);
CPPUNIT_ASSERT_EQUAL(expectedResultCode,
storageReply.getResult().getResult());
}
@@ -1561,6 +1565,33 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges()
}
}
+void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() {
+ _servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
+ _throttlers[0]->applyTimedBackpressure();
+
+ document::BucketId bucket(16, 6789);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
+
+ sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(),
+ api::MessageType::MERGEBUCKET_REPLY,
+ api::ReturnCode::BUSY);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().local.failures.busy.getValue());
+
+ _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds
+ // Backpressure has now been lifted. New merges should be forwarded
+ // to next node in chain as expected instead of being bounced with a reply.
+ sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+}
+
+// TODO test source only merges allowed through
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index cf2226b3a3b..fbc29e0234b 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -41,6 +41,15 @@ node_reliability int default=1 restart
max_merges_per_node int default=16
max_merge_queue_size int default=1024
+## If the persistence provider indicates that it has exhausted one or more
+## of its internal resources during a mutating operation, new merges will
+## be bounced for this duration. Not allowing further merges helps take
+## load off the node while it e.g. compacts its data stores or memory in
+## the background.
+## Note: this does not affect merges where the current node is marked as
+## "source only", as merges do not cause mutations on such nodes.
+resource_exhaustion_merge_back_pressure_duration_secs double default=30.0
+
## Whether the deadlock detector should be enabled or not. If disabled, it will
## still run, but it will never actually abort the process it is running in.
enable_dead_lock_detector bool default=false restart
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index eae252c22cd..62dd6170825 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -16,8 +16,7 @@ namespace storage {
namespace {
-struct NodeComparator
-{
+struct NodeComparator {
bool operator()(const api::MergeBucketCommand::Node& a,
const api::MergeBucketCommand::Node& b) const
{
@@ -28,8 +27,7 @@ struct NodeComparator
// Class used to sneakily get around IThrottlePolicy only accepting
// messagebus objects
template <typename Base>
-class DummyMbusMessage : public Base
-{
+class DummyMbusMessage : public Base {
private:
static const mbus::string NAME;
public:
@@ -70,6 +68,7 @@ MergeThrottler::ChainedMergeState::~ChainedMergeState() {}
MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
: metrics::MetricSet("mergethrottler", "", "", owner),
averageQueueWaitingTime("averagequeuewaitingtime", "", "Average time a merge spends in the throttler queue", this),
+ bounced_due_to_back_pressure("bounced_due_to_back_pressure", "", "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
{ }
@@ -200,6 +199,8 @@ MergeThrottler::MergeThrottler(
_component(compReg, "mergethrottler"),
_thread(),
_rendezvous(RENDEZVOUS_NONE),
+ _throttle_until_time(),
+ _backpressure_duration(std::chrono::seconds(30)),
_closing(false)
{
_throttlePolicy->setMaxPendingCount(20);
@@ -215,12 +216,13 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
vespalib::LockGuard lock(_stateLock);
if (newConfig->maxMergesPerNode < 1) {
- throw config::InvalidConfigException(
- "Cannot have a max merge count of less than 1");
+ throw config::InvalidConfigException("Cannot have a max merge count of less than 1");
}
if (newConfig->maxMergeQueueSize < 0) {
- throw config::InvalidConfigException(
- "Max merge queue size cannot be less than 0");
+ throw config::InvalidConfigException("Max merge queue size cannot be less than 0");
+ }
+ if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
+ throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0");
}
if (static_cast<double>(newConfig->maxMergesPerNode)
!= _throttlePolicy->getMaxPendingCount())
@@ -232,6 +234,8 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
LOG(debug, "Setting new max queue size to %d",
newConfig->maxMergeQueueSize);
_maxQueueSize = newConfig->maxMergeQueueSize;
+ _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
+ std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs));
}
MergeThrottler::~MergeThrottler()
@@ -276,9 +280,9 @@ MergeThrottler::onClose()
LOG(debug, "onClose; active: %" PRIu64 ", queued: %" PRIu64,
_merges.size(), _queue.size());
}
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interruptAndJoin(&_messageLock);
- _thread.reset(0);
+ _thread.reset();
}
}
@@ -408,8 +412,7 @@ MergeThrottler::enqueueMerge(
MessageGuard& msgGuard)
{
LOG(spam, "Enqueuing %s", msg->toString().c_str());
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex());
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
@@ -427,8 +430,7 @@ MergeThrottler::canProcessNewMerge() const
bool
MergeThrottler::isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const
{
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
return _merges.find(mergeCmd.getBucketId()) != _merges.end();
}
@@ -441,8 +443,7 @@ MergeThrottler::rejectMergeIfOutdated(
// Only reject merge commands! never reject replies (for obvious reasons..)
assert(msg->getType() == api::MessageType::MERGEBUCKET);
- const api::MergeBucketCommand& cmd(
- static_cast<const api::MergeBucketCommand&>(*msg));
+ auto& cmd = static_cast<const api::MergeBucketCommand&>(*msg);
if (cmd.getClusterStateVersion() == 0
|| cmd.getClusterStateVersion() >= rejectLessThanVersion)
@@ -455,9 +456,7 @@ MergeThrottler::rejectMergeIfOutdated(
<< ", storage node has version "
<< rejectLessThanVersion;
sendReply(cmd,
- api::ReturnCode(
- api::ReturnCode::WRONG_DISTRIBUTION,
- oss.str()),
+ api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, oss.str()),
msgGuard, _metrics->chaining);
LOG(debug, "Immediately rejected %s, due to it having state version < %u",
cmd.toString().c_str(), rejectLessThanVersion);
@@ -654,6 +653,26 @@ MergeThrottler::run(framework::ThreadHandle& thread)
LOG(debug, "Returning from MergeThrottler working thread");
}
+bool MergeThrottler::merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const {
+ (void) cmd;
+ if (_throttle_until_time.time_since_epoch().count() == 0) {
+ return false; // Avoid sampling the clock if throttling time is zeroed already.
+ }
+ // TODO source only bypass for own node
+ if (_component.getClock().getMonotonicTime() < _throttle_until_time) {
+ return true;
+ }
+ _throttle_until_time = std::chrono::steady_clock::time_point{};
+ return false;
+}
+
+void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) {
+ sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY,
+ "Node is throttling merges due to resource exhaustion back-pressure"),
+ guard, _metrics->local);
+ _metrics->bounced_due_to_back_pressure.inc();
+}
+
// Must be run from worker thread
void
MergeThrottler::handleMessageDown(
@@ -661,22 +680,24 @@ MergeThrottler::handleMessageDown(
MessageGuard& msgGuard)
{
if (msg->getType() == api::MessageType::MERGEBUCKET) {
- const api::MergeBucketCommand& mergeCmd
- = static_cast<const api::MergeBucketCommand&>(*msg);
+ auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg);
- uint32_t ourVersion(
- _component.getStateUpdater().getSystemState()->getVersion());
+ uint32_t ourVersion = _component.getStateUpdater().getSystemState()->getVersion();
if (mergeCmd.getClusterStateVersion() > ourVersion) {
LOG(debug, "Merge %s with newer cluster state than us arrived",
mergeCmd.toString().c_str());
- rejectOutdatedQueuedMerges(
- msgGuard, mergeCmd.getClusterStateVersion());
+ rejectOutdatedQueuedMerges(msgGuard, mergeCmd.getClusterStateVersion());
} else if (rejectMergeIfOutdated(msg, ourVersion, msgGuard)) {
// Skip merge entirely
return;
}
+ if (merge_is_backpressure_throttled(mergeCmd)) {
+ bounce_backpressure_throttled_merge(mergeCmd, msgGuard);
+ return;
+ }
+
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
} else if (canProcessNewMerge()) {
@@ -686,8 +707,7 @@ MergeThrottler::handleMessageDown(
} else {
// No more room at the inn. Return BUSY so that the
// distributor will wait a bit before retrying
- LOG(debug, "Queue is full; busy-returning %s",
- mergeCmd.toString().c_str());
+ LOG(debug, "Queue is full; busy-returning %s", mergeCmd.toString().c_str());
sendReply(mergeCmd, api::ReturnCode(api::ReturnCode::BUSY, "Merge queue is full"),
msgGuard, _metrics->local);
}
@@ -1204,6 +1224,12 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
}
+void MergeThrottler::applyTimedBackpressure() {
+ vespalib::LockGuard lock(_stateLock);
+ _throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration;
+ // TODO decide if we should abort active merges
+}
+
void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index a93dab4e24e..22c3c106b37 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -18,6 +18,7 @@
#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/metrics/metrics.h>
#include <vespa/config/config.h>
+#include <chrono>
namespace storage {
@@ -29,8 +30,7 @@ class MergeThrottler : public framework::Runnable,
private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>
{
public:
- class MergeFailureMetrics : public metrics::MetricSet
- {
+ class MergeFailureMetrics : public metrics::MetricSet {
public:
metrics::SumMetric<metrics::LongCountMetric> sum;
metrics::LongCountMetric notready;
@@ -47,8 +47,7 @@ public:
~MergeFailureMetrics();
};
- class MergeOperationMetrics : public metrics::MetricSet
- {
+ class MergeOperationMetrics : public metrics::MetricSet {
public:
metrics::LongCountMetric ok;
MergeFailureMetrics failures;
@@ -57,10 +56,10 @@ public:
~MergeOperationMetrics();
};
- class Metrics : public metrics::MetricSet
- {
+ class Metrics : public metrics::MetricSet {
public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
+ metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -71,8 +70,7 @@ public:
private:
// TODO: make PQ with stable ordering into own, generic class
template <class MessageType>
- struct StablePriorityOrderingWrapper
- {
+ struct StablePriorityOrderingWrapper {
MessageType _msg;
metrics::MetricTimer _startTimer;
uint64_t _sequence;
@@ -95,8 +93,7 @@ private:
}
};
- struct ChainedMergeState
- {
+ struct ChainedMergeState {
api::StorageMessage::SP _cmd;
std::string _cmdString; // For being able to print message even when we don't own it
uint64_t _clusterStateVersion;
@@ -167,6 +164,8 @@ private:
StorageComponent _component;
framework::Thread::UP _thread;
RendezvousState _rendezvous;
+ mutable std::chrono::steady_clock::time_point _throttle_until_time;
+ std::chrono::steady_clock::duration _backpressure_duration;
bool _closing;
public:
/**
@@ -174,7 +173,7 @@ public:
* than 1 as their window size.
*/
MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&);
- ~MergeThrottler();
+ ~MergeThrottler() override;
/** Implements document::Runnable::run */
void run(framework::ThreadHandle&) override;
@@ -187,6 +186,14 @@ public:
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override;
+ /*
+ * When invoked, merges to the node will be BUSY-bounced by the throttler
+ * for a configurable period of time instead of being processed.
+ *
+ * Must not be called if _stateLock is already held, or deadlock will occur.
+ */
+ void applyTimedBackpressure();
+
// For unit testing only
const ActiveMergeMap& getActiveMerges() const { return _merges; }
// For unit testing only
@@ -206,8 +213,7 @@ private:
friend class ThreadRendezvousGuard; // impl in .cpp file
// Simple helper class for centralizing chaining logic
- struct MergeNodeSequence
- {
+ struct MergeNodeSequence {
const api::MergeBucketCommand& _cmd;
std::vector<api::MergeBucketCommand::Node> _sortedNodes;
std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence
@@ -328,6 +334,9 @@ private:
*/
bool canProcessNewMerge() const;
+ bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
+ void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
+
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,
MessageGuard& msgGuard,