diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 14:28:52 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 14:28:52 +0000 |
commit | 6d220d01be0b76921ffd0de9ecfd8256db5c176b (patch) | |
tree | 49ab123532b18b8475988e72dd74906fc570f286 /storage | |
parent | 605a60c9a2c95d7a67e674995a455c5c5f7f3545 (diff) |
Do not throttle source only merges during back-pressure
Diffstat (limited to 'storage')
3 files changed, 34 insertions, 6 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index ec16b76bf9a..5e00ba373d2 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -11,6 +11,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/util/exceptions.h> +#include <unordered_set> #include <memory> #include <iterator> #include <vector> @@ -30,6 +31,7 @@ struct MergeBuilder { api::Timestamp _maxTimestamp; std::vector<uint16_t> _nodes; std::vector<uint16_t> _chain; + std::unordered_set<uint16_t> _source_only; uint64_t _clusterStateVersion; MergeBuilder(const document::BucketId& bucket) @@ -84,11 +86,17 @@ struct MergeBuilder { _chain.push_back(n2); return *this; } + MergeBuilder& source_only(uint16_t node) { + _source_only.insert(node); + return *this; + } api::MergeBucketCommand::SP create() const { std::vector<api::MergeBucketCommand::Node> n; for (uint32_t i = 0; i < _nodes.size(); ++i) { - n.push_back(_nodes[i]); + uint16_t node = _nodes[i]; + bool source_only = (_source_only.find(node) != _source_only.end()); + n.emplace_back(node, source_only); } std::shared_ptr<MergeBucketCommand> cmd( new MergeBucketCommand(_bucket, n, _maxTimestamp, @@ -137,6 +145,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration); + CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure); CPPUNIT_TEST_SUITE_END(); public: void setUp() override; @@ -167,6 +176,7 @@ public: void testApplyBucketDiffCommandNotInActiveSetIsRejected(); void testNewClusterStateAbortsAllOutdatedActiveMerges(); void backpressure_busy_bounces_merges_for_configured_duration(); + void source_only_merges_are_not_affected_by_backpressure(); private: static const int _storageNodeCount = 3; static const int _messageWaitTime = 100; @@ -1568,7 +1578,6 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges() void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() { _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); _throttlers[0]->applyTimedBackpressure(); - document::BucketId bucket(16, 6789); CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); @@ -1590,7 +1599,16 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); } -// TODO test source only merges allowed through +void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() { + _servers[2]->getClock().setAbsoluteTimeInSeconds(1000); + _throttlers[2]->applyTimedBackpressure(); + document::BucketId bucket(16, 6789); + + _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create()); + _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); +} // TODO test message queue aborting (use rendezvous functionality--make guard) diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 62dd6170825..cd3028dc6ba 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -654,18 +654,27 @@ MergeThrottler::run(framework::ThreadHandle& thread) } bool MergeThrottler::merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const { - (void) cmd; if (_throttle_until_time.time_since_epoch().count() == 0) { - return false; // Avoid sampling the clock if throttling time is zeroed already. + return false; + } + if (merge_has_this_node_as_source_only_node(cmd)) { + return false; } - // TODO source only bypass for own node if (_component.getClock().getMonotonicTime() < _throttle_until_time) { return true; } + // Avoid sampling the clock when it can't do anything useful. _throttle_until_time = std::chrono::steady_clock::time_point{}; return false; } +bool MergeThrottler::merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const { + auto self_is_source_only = [self = _component.getIndex()](auto& node) { + return (node.index == self) && node.sourceOnly; + }; + return std::any_of(cmd.getNodes().begin(), cmd.getNodes().end(), self_is_source_only); +} + void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard) { sendReply(cmd, api::ReturnCode(api::ReturnCode::BUSY, "Node is throttling merges due to resource exhaustion back-pressure"), diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 22c3c106b37..67bb01dd8fa 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -336,6 +336,7 @@ private: bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const; void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard); + bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, |