diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-09-05 16:37:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-05 16:37:25 +0200 |
commit | 960f6cd949db901b04b0c5e7541bd2d6d8c1eee9 (patch) | |
tree | 3849da9d6e27f03b9011b0ead0b95e0a8123eea1 | |
parent | fb1818824b94e1ee4ca553065a2f6ba73b219ca7 (diff) | |
parent | 5f7087adc6b74793c9f295ec732c22f3ec95e2e5 (diff) |
Merge pull request #3331 from vespa-engine/vekterli/evict-merge-throttler-queues-on-backpressure
Evict all queued merges from throttler on back-pressure
3 files changed, 45 insertions, 10 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 29ff780807f..aa0c0667b0d 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -146,6 +146,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration); CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure); + CPPUNIT_TEST(backpressure_evicts_all_queued_merges); CPPUNIT_TEST_SUITE_END(); public: void setUp() override; @@ -177,6 +178,7 @@ public: void testNewClusterStateAbortsAllOutdatedActiveMerges(); void backpressure_busy_bounces_merges_for_configured_duration(); void source_only_merges_are_not_affected_by_backpressure(); + void backpressure_evicts_all_queued_merges(); private: static const int _storageNodeCount = 3; static const int _messageWaitTime = 100; @@ -193,6 +195,8 @@ private: const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode); + + void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); }; const int MergeThrottlerTest::_storageNodeCount; @@ -1586,7 +1590,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); - sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(), + sendAndExpectReply(MergeBuilder(bucket).create(), api::MessageType::MERGEBUCKET_REPLY, api::ReturnCode::BUSY); @@ -1596,7 +1600,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds // Backpressure has now been lifted. New merges should be forwarded // to next node in chain as expected instead of being bounced with a reply. - sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); + sendMerge(MergeBuilder(bucket)); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active()); @@ -1608,12 +1612,35 @@ void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() { _throttlers[2]->apply_timed_backpressure(); document::BucketId bucket(16, 6789); - _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create()); + _topLinks[2]->sendDown(MergeBuilder(bucket).chain(0, 1).source_only(2).create()); _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); } +void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { + std::size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); + for (std::size_t i = 0; i < max_pending + queued_count; ++i) { + _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create()); + } + + // Wait till we have max_pending merge forwards and queued_count enqueued. + _topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime); +} + +void MergeThrottlerTest::backpressure_evicts_all_queued_merges() { + _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); + + fill_throttler_queue_with_n_commands(0, 1); + _topLinks[0]->getRepliesOnce(); // Clear all forwarded merges + _throttlers[0]->apply_timed_backpressure(); + + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + auto reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); +} + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 483992559e7..943059c00fc 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -520,13 +520,11 @@ MergeThrottler::rejectOutdatedQueuedMerges( uint32_t rejectLessThanVersion) { // Flush all queued merges that have an outdated version - MergePriorityQueue::iterator queueEnd = _queue.end(); - for (MergePriorityQueue::iterator i = _queue.begin(); i != queueEnd;) { - MergePriorityQueue::iterator erase_iter = i; + auto queueEnd = _queue.end(); + for (auto i = _queue.begin(); i != queueEnd;) { + auto erase_iter = i; ++i; - if (rejectMergeIfOutdated( - erase_iter->_msg, rejectLessThanVersion, msgGuard)) - { + if (rejectMergeIfOutdated(erase_iter->_msg, rejectLessThanVersion, msgGuard)){ _queue.erase(erase_iter); } } @@ -687,8 +685,17 @@ void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketC } void MergeThrottler::apply_timed_backpressure() { - vespalib::LockGuard lock(_stateLock); + MessageGuard msg_guard(_stateLock, *this); _throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration; + backpressure_bounce_all_queued_merges(msg_guard); +} + +void MergeThrottler::backpressure_bounce_all_queued_merges(MessageGuard& guard) { + for (auto& qm : _queue) { + auto& merge_cmd = dynamic_cast<api::MergeBucketCommand&>(*qm._msg); + bounce_backpressure_throttled_merge(merge_cmd, guard); + } + _queue.clear(); } bool MergeThrottler::backpressure_mode_active() const { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 070c2ef07c4..69fdfdc1b95 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -340,6 +340,7 @@ private: void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard); bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; bool backpressure_mode_active_no_lock() const; + void backpressure_bounce_all_queued_merges(MessageGuard& guard); void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, |