aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-05 13:55:42 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-05 13:55:42 +0000
commit5f7087adc6b74793c9f295ec732c22f3ec95e2e5 (patch)
tree7661af2c3442607f5fe8a3add05e5819785a3a96 /storage/src/tests/storageserver
parentb79e0e6ebd6ea52df63f0468fe51000fd770b135 (diff)
Evict all queued merges from throttler on backpressure
Diffstat (limited to 'storage/src/tests/storageserver')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp33
1 files changed, 30 insertions, 3 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 29ff780807f..aa0c0667b0d 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -146,6 +146,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration);
CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure);
+ CPPUNIT_TEST(backpressure_evicts_all_queued_merges);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() override;
@@ -177,6 +178,7 @@ public:
void testNewClusterStateAbortsAllOutdatedActiveMerges();
void backpressure_busy_bounces_merges_for_configured_duration();
void source_only_merges_are_not_affected_by_backpressure();
+ void backpressure_evicts_all_queued_merges();
private:
static const int _storageNodeCount = 3;
static const int _messageWaitTime = 100;
@@ -193,6 +195,8 @@ private:
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+
+ void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
};
const int MergeThrottlerTest::_storageNodeCount;
@@ -1586,7 +1590,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(),
+ sendAndExpectReply(MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY);
@@ -1596,7 +1600,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio
_servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds
// Backpressure has now been lifted. New merges should be forwarded
// to next node in chain as expected instead of being bounced with a reply.
- sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
+ sendMerge(MergeBuilder(bucket));
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active());
@@ -1608,12 +1612,35 @@ void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() {
_throttlers[2]->apply_timed_backpressure();
document::BucketId bucket(16, 6789);
- _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create());
+ _topLinks[2]->sendDown(MergeBuilder(bucket).chain(0, 1).source_only(2).create());
_topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
}
+void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
+ std::size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
+ for (std::size_t i = 0; i < max_pending + queued_count; ++i) {
+ _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create());
+ }
+
+ // Wait till we have max_pending merge forwards and queued_count enqueued.
+ _topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime);
+}
+
+void MergeThrottlerTest::backpressure_evicts_all_queued_merges() {
+ _servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
+
+ fill_throttler_queue_with_n_commands(0, 1);
+ _topLinks[0]->getRepliesOnce(); // Clear all forwarded merges
+ _throttlers[0]->apply_timed_backpressure();
+
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ auto reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
+}
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage