aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-30 13:49:05 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-08-30 13:49:05 +0000
commit605a60c9a2c95d7a67e674995a455c5c5f7f3545 (patch)
treecf796cdd42f829a764fa3bcc3e2866047eebe2f2 /storage/src/tests/storageserver/mergethrottlertest.cpp
parent75e055d4bb57d3323038fca456b8ec963d1994f9 (diff)
Add backpressure to MergeThrottler with configurable timing
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp61
1 files changed, 46 insertions, 15 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 559d54d3620..ec16b76bf9a 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,10 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <cppunit/extensions/HelperMacros.h>
-#include <memory>
-#include <iterator>
-#include <vector>
-#include <algorithm>
-#include <ctime>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <tests/common/testhelper.h>
@@ -16,6 +11,12 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <memory>
+#include <iterator>
+#include <vector>
+#include <algorithm>
+#include <chrono>
+#include <thread>
using namespace document;
using namespace storage::api;
@@ -24,8 +25,7 @@ namespace storage {
namespace {
-struct MergeBuilder
-{
+struct MergeBuilder {
document::BucketId _bucket;
api::Timestamp _maxTimestamp;
std::vector<uint16_t> _nodes;
@@ -41,6 +41,8 @@ struct MergeBuilder
nodes(0, 1, 2);
}
+ ~MergeBuilder();
+
MergeBuilder& nodes(uint16_t n0) {
_nodes.push_back(n0);
return *this;
@@ -97,6 +99,8 @@ struct MergeBuilder
}
};
+MergeBuilder::~MergeBuilder() {}
+
std::shared_ptr<api::SetSystemStateCommand>
makeSystemStateCmd(const std::string& state)
{
@@ -106,8 +110,7 @@ makeSystemStateCmd(const std::string& state)
} // anon ns
-class MergeThrottlerTest : public CppUnit::TestFixture
-{
+class MergeThrottlerTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(MergeThrottlerTest);
CPPUNIT_TEST(testMergesConfig);
CPPUNIT_TEST(testChain);
@@ -133,6 +136,7 @@ class MergeThrottlerTest : public CppUnit::TestFixture
CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected);
CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
+ CPPUNIT_TEST(backpressure_busy_bounces_merges_for_configured_duration);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() override;
@@ -162,6 +166,7 @@ public:
void testGetBucketDiffCommandNotInActiveSetIsRejected();
void testApplyBucketDiffCommandNotInActiveSetIsRejected();
void testNewClusterStateAbortsAllOutdatedActiveMerges();
+ void backpressure_busy_bounces_merges_for_configured_duration();
private:
static const int _storageNodeCount = 3;
static const int _messageWaitTime = 100;
@@ -247,7 +252,7 @@ checkChain(const StorageMessage::SP& msg,
void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout)
{
- std::time_t start = std::time(0);
+ const auto start = std::chrono::steady_clock::now();
while (true) {
std::size_t count;
{
@@ -257,14 +262,14 @@ void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeou
if (count == sz) {
break;
}
- std::time_t now = std::time(0);
- if (now - start > timeout) {
+ auto now = std::chrono::steady_clock::now();
+ if (now - start > std::chrono::seconds(timeout)) {
std::ostringstream os;
os << "Timeout while waiting for merge queue with " << sz << " items. Had "
<< count << " at timeout.";
throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC);
}
- FastOS_Thread::Sleep(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
@@ -1491,8 +1496,7 @@ MergeThrottlerTest::sendAndExpectReply(
_topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime);
StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage(
expectedReplyType));
- api::StorageReply& storageReply(
- dynamic_cast<api::StorageReply&>(*reply));
+ auto& storageReply = dynamic_cast<api::StorageReply&>(*reply);
CPPUNIT_ASSERT_EQUAL(expectedResultCode,
storageReply.getResult().getResult());
}
@@ -1561,6 +1565,33 @@ MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges()
}
}
+void MergeThrottlerTest::backpressure_busy_bounces_merges_for_configured_duration() {
+ _servers[0]->getClock().setAbsoluteTimeInSeconds(1000);
+ _throttlers[0]->applyTimedBackpressure();
+
+ document::BucketId bucket(16, 6789);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
+
+ sendAndExpectReply(MergeBuilder(bucket).clusterStateVersion(10).create(),
+ api::MessageType::MERGEBUCKET_REPLY,
+ api::ReturnCode::BUSY);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().local.failures.busy.getValue());
+
+ _servers[0]->getClock().addSecondsToTime(15); // Test-config has duration set to 15 seconds
+ // Backpressure has now been lifted. New merges should be forwarded
+ // to next node in chain as expected instead of being bounced with a reply.
+ sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
+}
+
+// TODO test source only merges allowed through
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage