diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 13:49:05 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-08-30 13:49:05 +0000 |
commit | 605a60c9a2c95d7a67e674995a455c5c5f7f3545 (patch) | |
tree | cf796cdd42f829a764fa3bcc3e2866047eebe2f2 /storage/src/tests/storageserver/mergethrottlertest.cpp | |
parent | 75e055d4bb57d3323038fca456b8ec963d1994f9 (diff) |
Add backpressure to MergeThrottler with configurable timing
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 61 |
1 files changed, 46 insertions, 15 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 559d54d3620..ec16b76bf9a 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,10 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <cppunit/extensions/HelperMacros.h> -#include <memory> -#include <iterator> -#include <vector> -#include <algorithm> -#include <ctime> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <tests/common/testhelper.h> @@ -16,6 +11,12 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/util/exceptions.h> +#include <memory> +#include <iterator> +#include <vector> +#include <algorithm> +#include <chrono> +#include <thread> using namespace document; using namespace storage::api; @@ -24,8 +25,7 @@ namespace storage { namespace { -struct MergeBuilder -{ +struct MergeBuilder { document::BucketId _bucket; api::Timestamp _maxTimestamp; std::vector<uint16_t> _nodes; @@ -41,6 +41,8 @@ struct MergeBuilder nodes(0, 1, 2); } + ~MergeBuilder(); + MergeBuilder& nodes(uint16_t n0) { _nodes.push_back(n0); return *this; @@ -97,6 +99,8 @@ struct MergeBuilder } }; +MergeBuilder::~MergeBuilder() {} + std::shared_ptr<api::SetSystemStateCommand> makeSystemStateCmd(const std::string& state) { @@ -106,8 +110,7 @@ makeSystemStateCmd(const std::string& state) } // anon ns -class MergeThrottlerTest : public CppUnit::TestFixture -{ +class MergeThrottlerTest : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(MergeThrottlerTest); CPPUNIT_TEST(testMergesConfig); CPPUNIT_TEST(testChain); @@ -133,6 +136,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected); CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); + CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration); CPPUNIT_TEST_SUITE_END(); public: void setUp() override; @@ -162,6 +166,7 @@ public: void testGetBucketDiffCommandNotInActiveSetIsRejected(); void testApplyBucketDiffCommandNotInActiveSetIsRejected(); void testNewClusterStateAbortsAllOutdatedActiveMerges(); + void backpressure_busy_bounces_merges_for_configured_duration(); private: static const int _storageNodeCount = 3; static const int _messageWaitTime = 100; @@ -247,7 +252,7 @@ checkChain(const StorageMessage::SP& msg, void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout) { - std::time_t start = std::time(0); + const auto start = std::chrono::steady_clock::now(); while (true) { std::size_t count; { @@ -257,14 +262,14 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeou if (count == sz) { break; } - std::time_t now = std::time(0); - if (now - start > timeout) { + auto now = std::chrono::steady_clock::now(); + if (now - start > std::chrono::seconds(timeout)) { std::ostringstream os; os << "Timeout while waiting for merge queue with " << sz << " items. Had " << count << " at timeout."; throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC); } - FastOS_Thread::Sleep(1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -1491,8 +1496,7 @@ MergeThrottlerTest::sendAndExpectReply( _topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime); StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage( expectedReplyType)); - api::StorageReply& storageReply( - dynamic_cast<api::StorageReply&>(*reply)); + auto& storageReply = dynamic_cast<api::StorageReply&>(*reply); CPPUNIT_ASSERT_EQUAL(expectedResultCode, storageReply.getResult().getResult()); } @@ -1561,6 +1565,33 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges() } } +void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() { + _servers[0]->getClock().setAbsoluteTimeInSeconds(1000); + _throttlers[0]->applyTimedBackpressure(); + + document::BucketId bucket(16, 6789); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); + + sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(), + api::MessageType::MERGEBUCKET_REPLY, + api::ReturnCode::BUSY); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().local.failures.busy.getValue()); + + _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds + // Backpressure has now been lifted. New merges should be forwarded + // to next node in chain as expected instead of being bounced with a reply. + sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); +} + +// TODO test source only merges allowed through + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage |