aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-05 16:37:25 +0200
committerGitHub <noreply@github.com>2017-09-05 16:37:25 +0200
commit960f6cd949db901b04b0c5e7541bd2d6d8c1eee9 (patch)
tree3849da9d6e27f03b9011b0ead0b95e0a8123eea1
parentfb1818824b94e1ee4ca553065a2f6ba73b219ca7 (diff)
parent5f7087adc6b74793c9f295ec732c22f3ec95e2e5 (diff)
Merge pull request #3331 from vespa-engine/vekterli/evict-merge-throttler-queues-on-backpressure
Evict all queued merges from throttler on back-pressure
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp33
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp21
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
3 files changed, 45 insertions, 10 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 29ff780807f..aa0c0667b0d 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -146,6 +146,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration);
CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure);
+ CPPUNIT_TEST(backpressure_evicts_all_queued_merges);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() override;
@@ -177,6 +178,7 @@ public:
void testNewClusterStateAbortsAllOutdatedActiveMerges();
void backpressure_busy_bounces_merges_for_configured_duration();
void source_only_merges_are_not_affected_by_backpressure();
+ void backpressure_evicts_all_queued_merges();
private:
static const int _storageNodeCount = 3;
static const int _messageWaitTime = 100;
@@ -193,6 +195,8 @@ private:
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+
+ void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
};
const int MergeThrottlerTest::_storageNodeCount;
@@ -1586,7 +1590,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(),
+ sendAndExpectReply(MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY);
@@ -1596,7 +1600,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio
_servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds
// Backpressure has now been lifted. New merges should be forwarded
// to next node in chain as expected instead of being bounced with a reply.
- sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
+ sendMerge(MergeBuilder(bucket));
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active());
@@ -1608,12 +1612,35 @@ void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() {
_throttlers[2]->apply_timed_backpressure();
document::BucketId bucket(16, 6789);
- _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create());
+ _topLinks[2]->sendDown(MergeBuilder(bucket).chain(0, 1).source_only(2).create());
_topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
}
+void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
+ std::size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
+ for (std::size_t i = 0; i < max_pending + queued_count; ++i) {
+ _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create());
+ }
+
+ // Wait till we have max_pending merge forwards and queued_count enqueued.
+ _topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime);
+}
+
+void MergeThrottlerTest::backpressure_evicts_all_queued_merges() {
+ _servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
+
+ fill_throttler_queue_with_n_commands(0, 1);
+ _topLinks[0]->getRepliesOnce(); // Clear all forwarded merges
+ _throttlers[0]->apply_timed_backpressure();
+
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ auto reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
+}
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 483992559e7..943059c00fc 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -520,13 +520,11 @@ MergeThrottler::rejectOutdatedQueuedMerges(
uint32_t rejectLessThanVersion)
{
// Flush all queued merges that have an outdated version
- MergePriorityQueue::iterator queueEnd = _queue.end();
- for (MergePriorityQueue::iterator i = _queue.begin(); i != queueEnd;) {
- MergePriorityQueue::iterator erase_iter = i;
+ auto queueEnd = _queue.end();
+ for (auto i = _queue.begin(); i != queueEnd;) {
+ auto erase_iter = i;
++i;
- if (rejectMergeIfOutdated(
- erase_iter->_msg, rejectLessThanVersion, msgGuard))
- {
+ if (rejectMergeIfOutdated(erase_iter->_msg, rejectLessThanVersion, msgGuard)){
_queue.erase(erase_iter);
}
}
@@ -687,8 +685,17 @@ void MergeThrottler::bounce_backpressure_throttled_merge(const api::MergeBucketC
}
void MergeThrottler::apply_timed_backpressure() {
- vespalib::LockGuard lock(_stateLock);
+ MessageGuard msg_guard(_stateLock, *this);
_throttle_until_time = _component.getClock().getMonotonicTime() + _backpressure_duration;
+ backpressure_bounce_all_queued_merges(msg_guard);
+}
+
+void MergeThrottler::backpressure_bounce_all_queued_merges(MessageGuard& guard) {
+ for (auto& qm : _queue) {
+ auto& merge_cmd = dynamic_cast<api::MergeBucketCommand&>(*qm._msg);
+ bounce_backpressure_throttled_merge(merge_cmd, guard);
+ }
+ _queue.clear();
}
bool MergeThrottler::backpressure_mode_active() const {
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 070c2ef07c4..69fdfdc1b95 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -340,6 +340,7 @@ private:
void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
bool backpressure_mode_active_no_lock() const;
+ void backpressure_bounce_all_queued_merges(MessageGuard& guard);
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,