diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 13:49:05 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 13:49:05 +0000 |
commit | 605a60c9a2c95d7a67e674995a455c5c5f7f3545 (patch) | |
tree | cf796cdd42f829a764fa3bcc3e2866047eebe2f2 /storage | |
parent | 75e055d4bb57d3323038fca456b8ec963d1994f9 (diff) |
Add backpressure to MergeThrottler with configurable timing
Diffstat (limited to 'storage')
5 files changed, 131 insertions, 55 deletions
diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp index fa85f4c6a45..296a9a3cc0f 100644 --- a/storage/src/tests/common/testhelper.cpp +++ b/storage/src/tests/common/testhelper.cpp @@ -150,6 +150,7 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro config->set("enable_dead_lock_detector_warnings", "false"); config->set("max_merges_per_node", "25"); config->set("max_merge_queue_size", "20"); + config->set("resource_exhaustion_merge_back_pressure_duration_secs", "15.0"); vespalib::string rootFolder = rootOfRoot + "_"; rootFolder += (storagenode ? "vdsroot" : "vdsroot.distributor"); config->set("root_folder", rootFolder); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 559d54d3620..ec16b76bf9a 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,10 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <cppunit/extensions/HelperMacros.h> -#include <memory> -#include <iterator> -#include <vector> -#include <algorithm> -#include <ctime> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <tests/common/testhelper.h> @@ -16,6 +11,12 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/util/exceptions.h> +#include <memory> +#include <iterator> +#include <vector> +#include <algorithm> +#include <chrono> +#include <thread> using namespace document; using namespace storage::api; @@ -24,8 +25,7 @@ namespace storage { namespace { -struct MergeBuilder -{ +struct MergeBuilder { document::BucketId _bucket; api::Timestamp _maxTimestamp; std::vector<uint16_t> _nodes; @@ -41,6 +41,8 @@ struct MergeBuilder nodes(0, 1, 2); } + ~MergeBuilder(); + MergeBuilder& nodes(uint16_t n0) { _nodes.push_back(n0); return *this; @@ -97,6 +99,8 @@ struct MergeBuilder } }; +MergeBuilder::~MergeBuilder() {} + std::shared_ptr<api::SetSystemStateCommand> makeSystemStateCmd(const std::string& state) { @@ -106,8 +110,7 @@ makeSystemStateCmd(const std::string& state) } // anon ns -class MergeThrottlerTest : public CppUnit::TestFixture -{ +class MergeThrottlerTest : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(MergeThrottlerTest); CPPUNIT_TEST(testMergesConfig); CPPUNIT_TEST(testChain); @@ -133,6 +136,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); + CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration); CPPUNIT_TEST_SUITE_END(); public: void setUp() override; @@ -162,6 +166,7 @@ public: void testGetBucketDiffCommandNotInActiveSetIsRejected(); void testApplyBucketDiffCommandNotInActiveSetIsRejected(); void testNewClusterStateAbortsAllOutdatedActiveMerges(); + void backpressure_busy_bounces_merges_for_configured_duration(); private: static const int _storageNodeCount = 3; static const int _messageWaitTime = 100; @@ -247,7 +252,7 @@ checkChain(const StorageMessage::SP& msg, void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout) { - std::time_t start = std::time(0); + const auto start = std::chrono::steady_clock::now(); while (true) { std::size_t count; { @@ -257,14 +262,14 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeou if (count == sz) { break; } - std::time_t now = std::time(0); - if (now - start > timeout) { + auto now = std::chrono::steady_clock::now(); + if (now - start > std::chrono::seconds(timeout)) { std::ostringstream os; os << "Timeout while waiting for merge queue with " << sz << " items. Had " << count << " at timeout."; throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC); } - FastOS_Thread::Sleep(1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -1491,8 +1496,7 @@ MergeThrottlerTest::sendAndExpectReply( _topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime); StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage( expectedReplyType)); - api::StorageReply& storageReply( - dynamic_cast<api::StorageReply&>(*reply)); + auto& storageReply = dynamic_cast<api::StorageReply&>(*reply); CPPUNIT_ASSERT_EQUAL(expectedResultCode, storageReply.getResult().getResult()); } @@ -1561,6 +1565,33 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges() } } +void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() { + _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); + _throttlers[0]->applyTimedBackpressure(); + + document::BucketId bucket(16, 6789); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); + + sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(), + api::MessageType::MERGEBUCKET_REPLY, + api::ReturnCode::BUSY); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().local.failures.busy.getValue()); + + _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds + // Backpressure has now been lifted. New merges should be forwarded + // to next node in chain as expected instead of being bounced with a reply. + sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); +} + +// TODO test source only merges allowed through + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index cf2226b3a3b..fbc29e0234b 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -41,6 +41,15 @@ node_reliability int default=1 restart max_merges_per_node int default=16 max_merge_queue_size int default=1024 +## If the persistence provider indicates that it has exhausted one or more +## of its internal resources during a mutating operation, new merges will +## be bounced for this duration. Not allowing further merges helps take +## load off the node while it e.g. compacts its data stores or memory in +## the background. +## Note: this does not affect merges where the current node is marked as +## "source only", as merges do not cause mutations on such nodes. +resource_exhaustion_merge_back_pressure_duration_secs double default=30.0 + ## Whether the deadlock detector should be enabled or not. If disabled, it will ## still run, but it will never actually abort the process it is running in. enable_dead_lock_detector bool default=false restart diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index eae252c22cd..62dd6170825 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -16,8 +16,7 @@ namespace storage { namespace { -struct NodeComparator -{ +struct NodeComparator { bool operator()(const api::MergeBucketCommand::Node& a, const api::MergeBucketCommand::Node& b) const { @@ -28,8 +27,7 @@ struct NodeComparator // Class used to sneakily get around IThrottlePolicy only accepting // messagebus objects template <typename Base> -class DummyMbusMessage : public Base -{ +class DummyMbusMessage : public Base { private: static const mbus::string NAME; public: @@ -70,6 +68,7 @@ MergeThrottler::ChainedMergeState::~ChainedMergeState() {} MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) : metrics::MetricSet("mergethrottler", "", "", owner), averageQueueWaitingTime("averagequeuewaitingtime", "", "Average time a merge spends in the throttler queue", this), + bounced_due_to_back_pressure("bounced_due_to_back_pressure", "", "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) { } @@ -200,6 +199,8 @@ MergeThrottler::MergeThrottler( _component(compReg, "mergethrottler"), _thread(), _rendezvous(RENDEZVOUS_NONE), + _throttle_until_time(), + _backpressure_duration(std::chrono::seconds(30)), _closing(false) { _throttlePolicy->setMaxPendingCount(20); @@ -215,12 +216,13 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ vespalib::LockGuard lock(_stateLock); if (newConfig->maxMergesPerNode < 1) { - throw config::InvalidConfigException( - "Cannot have a max merge count of less than 1"); + throw config::InvalidConfigException("Cannot have a max merge count of less than 1"); } if (newConfig->maxMergeQueueSize < 0) { - throw config::InvalidConfigException( - "Max merge queue size cannot be less than 0"); + throw config::InvalidConfigException("Max merge queue size cannot be less than 0"); + } + if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) { + throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0"); } if (static_cast<double>(newConfig->maxMergesPerNode) != _throttlePolicy->getMaxPendingCount()) @@ -232,6 +234,8 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ LOG(debug, "Setting new max queue size to %d", newConfig->maxMergeQueueSize); _maxQueueSize = newConfig->maxMergeQueueSize; + _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( + std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs)); } MergeThrottler::~MergeThrottler() @@ -276,9 +280,9 @@ MergeThrottler::onClose() LOG(debug, "onClose; active: %" PRIu64 ", queued: %" PRIu64, _merges.size(), _queue.size()); } - if (_thread.get() != 0) { + if (_thread) { _thread->interruptAndJoin(&_messageLock); - _thread.reset(0); + _thread.reset(); } } @@ -408,8 +412,7 @@ MergeThrottler::enqueueMerge( MessageGuard& msgGuard) { LOG(spam, "Enqueuing %s", msg->toString().c_str()); - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); MergeNodeSequence nodeSeq(mergeCmd, _component.getIndex()); if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; @@ -427,8 +430,7 @@ MergeThrottler::canProcessNewMerge() const bool MergeThrottler::isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const { - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); return _merges.find(mergeCmd.getBucketId()) != _merges.end(); } @@ -441,8 +443,7 @@ MergeThrottler::rejectMergeIfOutdated( // Only reject merge commands! never reject replies (for obvious reasons..) assert(msg->getType() == api::MessageType::MERGEBUCKET); - const api::MergeBucketCommand& cmd( - static_cast<const api::MergeBucketCommand&>(*msg)); + auto& cmd = static_cast<const api::MergeBucketCommand&>(*msg); if (cmd.getClusterStateVersion() == 0 || cmd.getClusterStateVersion() >= rejectLessThanVersion) @@ -455,9 +456,7 @@ MergeThrottler::rejectMergeIfOutdated( << ", storage node has version " << rejectLessThanVersion; sendReply(cmd, - api::ReturnCode( - api::ReturnCode::WRONG_DISTRIBUTION, - oss.str()), + api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, oss.str()), msgGuard, _metrics->chaining); LOG(debug, "Immediately rejected %s, due to it having state version < %u", cmd.toString().c_str(), rejectLessThanVersion); @@ -654,6 +653,26 @@ MergeThrottler::run(framework::ThreadHandle& thread) LOG(debug, "Returning from MergeThrottler working thread"); } +bool MergeThrottler::merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const { + (void) cmd; + if (_throttle_until_time.time_since_epoch().count() == 0) { + return false; // Avoid sampling the clock if throttling time is zeroed already. + } + // TODO source only bypass for own node + if (_component.getClock().getMonotonicTime() < _throttle_until_time) { + return true; + } + _throttle_until_time = std::chrono::steady_clock::time_point{}; + return false; +} + +void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) { + sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY, + "Node is throttling merges due to resource exhaustion back-pressure"), + guard, _metrics->local); + _metrics->bounced_due_to_back_pressure.inc(); +} + // Must be run from worker thread void MergeThrottler::handleMessageDown( @@ -661,22 +680,24 @@ MergeThrottler::handleMessageDown( MessageGuard& msgGuard) { if (msg->getType() == api::MessageType::MERGEBUCKET) { - const api::MergeBucketCommand& mergeCmd - = static_cast<const api::MergeBucketCommand&>(*msg); + auto& mergeCmd = static_cast<const api::MergeBucketCommand&>(*msg); - uint32_t ourVersion( - _component.getStateUpdater().getSystemState()->getVersion()); + uint32_t ourVersion = _component.getStateUpdater().getSystemState()->getVersion(); if (mergeCmd.getClusterStateVersion() > ourVersion) { LOG(debug, "Merge %s with newer cluster state than us arrived", mergeCmd.toString().c_str()); - rejectOutdatedQueuedMerges( - msgGuard, mergeCmd.getClusterStateVersion()); + rejectOutdatedQueuedMerges(msgGuard, mergeCmd.getClusterStateVersion()); } else if (rejectMergeIfOutdated(msg, ourVersion, msgGuard)) { // Skip merge entirely return; } + if (merge_is_backpressure_throttled(mergeCmd)) { + bounce_backpressure_throttled_merge(mergeCmd, msgGuard); + return; + } + if (isMergeAlreadyKnown(msg)) { processCycledMergeCommand(msg, msgGuard); } else if (canProcessNewMerge()) { @@ -686,8 +707,7 @@ MergeThrottler::handleMessageDown( } else { // No more room at the inn. Return BUSY so that the // distributor will wait a bit before retrying - LOG(debug, "Queue is full; busy-returning %s", - mergeCmd.toString().c_str()); + LOG(debug, "Queue is full; busy-returning %s", mergeCmd.toString().c_str()); sendReply(mergeCmd, api::ReturnCode(api::ReturnCode::BUSY, "Merge queue is full"), msgGuard, _metrics->local); } @@ -1204,6 +1224,12 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } } +void MergeThrottler::applyTimedBackpressure() { + vespalib::LockGuard lock(_stateLock); + _throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration; + // TODO decide if we should abort active merges +} + void MergeThrottler::print(std::ostream& out, bool /*verbose*/, const std::string& /*indent*/) const diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index a93dab4e24e..22c3c106b37 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -18,6 +18,7 @@ #include <vespa/messagebus/staticthrottlepolicy.h> #include <vespa/metrics/metrics.h> #include <vespa/config/config.h> +#include <chrono> namespace storage { @@ -29,8 +30,7 @@ class MergeThrottler : public framework::Runnable, private config::IFetcherCallback<vespa::config::content::core::StorServerConfig> { public: - class MergeFailureMetrics : public metrics::MetricSet - { + class MergeFailureMetrics : public metrics::MetricSet { public: metrics::SumMetric<metrics::LongCountMetric> sum; metrics::LongCountMetric notready; @@ -47,8 +47,7 @@ public: ~MergeFailureMetrics(); }; - class MergeOperationMetrics : public metrics::MetricSet - { + class MergeOperationMetrics : public metrics::MetricSet { public: metrics::LongCountMetric ok; MergeFailureMetrics failures; @@ -57,10 +56,10 @@ public: ~MergeOperationMetrics(); }; - class Metrics : public metrics::MetricSet - { + class Metrics : public metrics::MetricSet { public: metrics::DoubleAverageMetric averageQueueWaitingTime; + metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -71,8 +70,7 @@ public: private: // TODO: make PQ with stable ordering into own, generic class template <class MessageType> - struct StablePriorityOrderingWrapper - { + struct StablePriorityOrderingWrapper { MessageType _msg; metrics::MetricTimer _startTimer; uint64_t _sequence; @@ -95,8 +93,7 @@ private: } }; - struct ChainedMergeState - { + struct ChainedMergeState { api::StorageMessage::SP _cmd; std::string _cmdString; // For being able to print message even when we don't own it uint64_t _clusterStateVersion; @@ -167,6 +164,8 @@ private: StorageComponent _component; framework::Thread::UP _thread; RendezvousState _rendezvous; + mutable std::chrono::steady_clock::time_point _throttle_until_time; + std::chrono::steady_clock::duration _backpressure_duration; bool _closing; public: /** @@ -174,7 +173,7 @@ public: * than 1 as their window size. */ MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&); - ~MergeThrottler(); + ~MergeThrottler() override; /** Implements document::Runnable::run */ void run(framework::ThreadHandle&) override; @@ -187,6 +186,14 @@ public: bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override; + /* + * When invoked, merges to the node will be BUSY-bounced by the throttler + * for a configurable period of time instead of being processed. + * + * Must not be called if _stateLock is already held, or deadlock will occur. + */ + void applyTimedBackpressure(); + // For unit testing only const ActiveMergeMap& getActiveMerges() const { return _merges; } // For unit testing only @@ -206,8 +213,7 @@ private: friend class ThreadRendezvousGuard; // impl in .cpp file // Simple helper class for centralizing chaining logic - struct MergeNodeSequence - { + struct MergeNodeSequence { const api::MergeBucketCommand& _cmd; std::vector<api::MergeBucketCommand::Node> _sortedNodes; std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence @@ -328,6 +334,9 @@ private: */ bool canProcessNewMerge() const; + bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const; + void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard); + void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, MessageGuard& msgGuard, |