diff options
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 33 |
1 files changed, 30 insertions, 3 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 29ff780807f..aa0c0667b0d 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -146,6 +146,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration); CPPUNIT_TEST(source_only_merges_are_not_affected_by_backpressure); + CPPUNIT_TEST(backpressure_evicts_all_queued_merges); CPPUNIT_TEST_SUITE_END(); public: void setUp() override; @@ -177,6 +178,7 @@ public: void testNewClusterStateAbortsAllOutdatedActiveMerges(); void backpressure_busy_bounces_merges_for_configured_duration(); void source_only_merges_are_not_affected_by_backpressure(); + void backpressure_evicts_all_queued_merges(); private: static const int _storageNodeCount = 3; static const int _messageWaitTime = 100; @@ -193,6 +195,8 @@ private: const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode); + + void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); }; const int MergeThrottlerTest::_storageNodeCount; @@ -1586,7 +1590,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); - sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(), + sendAndExpectReply(MergeBuilder(bucket).create(), api::MessageType::MERGEBUCKET_REPLY, api::ReturnCode::BUSY); @@ -1596,7 +1600,7 @@ void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duratio _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds // Backpressure has now been lifted. New merges should be forwarded // to next node in chain as expected instead of being bounced with a reply. - sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); + sendMerge(MergeBuilder(bucket)); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); CPPUNIT_ASSERT(!_throttlers[0]->backpressure_mode_active()); @@ -1608,12 +1612,35 @@ void MergeThrottlerTest::source_only_merges_are_not_affected_by_backpressure() { _throttlers[2]->apply_timed_backpressure(); document::BucketId bucket(16, 6789); - _topLinks[2]->sendDown(MergeBuilder(bucket).clusterStateVersion(10).chain(0, 1).source_only(2).create()); + _topLinks[2]->sendDown(MergeBuilder(bucket).chain(0, 1).source_only(2).create()); _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); } +void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { + std::size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); + for (std::size_t i = 0; i < max_pending + queued_count; ++i) { + _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create()); + } + + // Wait till we have max_pending merge forwards and queued_count enqueued. + _topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime); +} + +void MergeThrottlerTest::backpressure_evicts_all_queued_merges() { + _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); + + fill_throttler_queue_with_n_commands(0, 1); + _topLinks[0]->getRepliesOnce(); // Clear all forwarded merges + _throttlers[0]->apply_timed_backpressure(); + + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + auto reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); +} + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage |