summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/pendingmessagetrackertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/pendingmessagetrackertest.cpp')
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp139
1 files changed, 83 insertions, 56 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index cda9a9e3782..fa36b4e6305 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -13,12 +13,10 @@
namespace storage {
namespace distributor {
-// Workaround typedef for not (yet) running with --std=c++14 which supports
-// user defined literals. Once we do, replace ms(123) with 123ms.
-using ms = std::chrono::milliseconds;
+using namespace std::chrono_literals;
-class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture {
- CPPUNIT_TEST_SUITE(PendingMessageTrackerCallback_Test);
+class PendingMessageTrackerTest : public CppUnit::TestFixture {
+ CPPUNIT_TEST_SUITE(PendingMessageTrackerTest);
CPPUNIT_TEST(testSimple);
CPPUNIT_TEST(testMultipleMessages);
CPPUNIT_TEST(testStartPage);
@@ -36,6 +34,8 @@ class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture {
CPPUNIT_TEST(timeTravellingClockLatenciesNotRegistered);
CPPUNIT_TEST(statsSnapshotIncludesAllNodes);
CPPUNIT_TEST(latencyProviderForwardsToImplementation);
+ CPPUNIT_TEST(busy_reply_marks_node_as_busy);
+ CPPUNIT_TEST(busy_node_duration_can_be_adjusted);
CPPUNIT_TEST_SUITE_END();
public:
@@ -56,6 +56,8 @@ public:
void timeTravellingClockLatenciesNotRegistered();
void statsSnapshotIncludesAllNodes();
void latencyProviderForwardsToImplementation();
+ void busy_reply_marks_node_as_busy();
+ void busy_node_duration_can_be_adjusted();
private:
void insertMessages(PendingMessageTracker& tracker);
@@ -122,10 +124,12 @@ public:
}
void sendPutReply(api::PutCommand& putCmd,
- const RequestBuilder& builder)
+ const RequestBuilder& builder,
+ const api::ReturnCode& result = api::ReturnCode())
{
assignMockedTime(builder.atTime());
auto putReply = putCmd.makeReply();
+ putReply->setResult(result);
_tracker->reply(*putReply);
}
@@ -149,8 +153,8 @@ public:
void sendPutAndReplyWithLatency(uint16_t node,
std::chrono::milliseconds latency)
{
- auto put = sendPut(RequestBuilder().atTime(ms(1000)).toNode(node));
- sendPutReply(*put, RequestBuilder().atTime(ms(1000) + latency));
+ auto put = sendPut(RequestBuilder().atTime(1000ms).toNode(node));
+ sendPutReply(*put, RequestBuilder().atTime(1000ms + latency));
}
OperationStats getNodePutOperationStats(uint16_t node) {
@@ -158,6 +162,7 @@ public:
}
PendingMessageTracker& tracker() { return *_tracker; }
+ auto& clock() { return _clock; }
private:
std::string createDummyIdString(const document::BucketId& bucket) const {
@@ -215,17 +220,16 @@ Fixture::Fixture()
_clock.setAbsoluteTimeInSeconds(1);
// Have to set clock in compReg before constructing tracker, or it'll
// flip out and die on an explicit nullptr check.
- _tracker = std::unique_ptr<PendingMessageTracker>(
- new PendingMessageTracker(_compReg));
+ _tracker = std::make_unique<PendingMessageTracker>(_compReg);
}
Fixture::~Fixture() {}
}
-CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerCallback_Test);
+CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerTest);
void
-PendingMessageTrackerCallback_Test::testSimple()
+PendingMessageTrackerTest::testSimple()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -269,7 +273,7 @@ PendingMessageTrackerCallback_Test::testSimple()
}
void
-PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracker)
+PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
{
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
@@ -294,7 +298,7 @@ PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracke
}
void
-PendingMessageTrackerCallback_Test::testStartPage()
+PendingMessageTrackerTest::testStartPage()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -318,7 +322,7 @@ PendingMessageTrackerCallback_Test::testStartPage()
}
void
-PendingMessageTrackerCallback_Test::testMultipleMessages()
+PendingMessageTrackerTest::testMultipleMessages()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -419,7 +423,7 @@ public:
}
void
-PendingMessageTrackerCallback_Test::testGetPendingMessageTypes()
+PendingMessageTrackerTest::testGetPendingMessageTypes()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -456,7 +460,7 @@ PendingMessageTrackerCallback_Test::testGetPendingMessageTypes()
}
void
-PendingMessageTrackerCallback_Test::testHasPendingMessage()
+PendingMessageTrackerTest::testHasPendingMessage()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -508,7 +512,7 @@ public:
} // anon ns
void
-PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket()
+PendingMessageTrackerTest::testGetAllMessagesForSingleBucket()
{
StorageComponentRegisterImpl compReg;
framework::defaultimplementation::FakeClock clock;
@@ -535,10 +539,10 @@ PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket()
}
void
-PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed()
+PendingMessageTrackerTest::nodeStatsCanBeOutputStreamed()
{
NodeStats stats;
- stats.puts = makeOpStats(ms(56789), 10);
+ stats.puts = makeOpStats(56789ms, 10);
std::ostringstream os;
os << stats;
@@ -550,120 +554,143 @@ PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed()
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsInitiallyZero()
+PendingMessageTrackerTest::totalPutLatencyIsInitiallyZero()
{
Fixture fixture;
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsNotAlteredBeforeReplyReceived()
+PendingMessageTrackerTest::statsNotAlteredBeforeReplyReceived()
{
Fixture fixture;
- fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0));
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsTrackedForSingleRequest()
+PendingMessageTrackerTest::totalPutLatencyIsTrackedForSingleRequest()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsAreTrackedSeparatelyPerNode()
+PendingMessageTrackerTest::statsAreTrackedSeparatelyPerNode()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(1, ms(600));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(1, 600ms);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
fixture.getNodePutOperationStats(0));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1),
fixture.getNodePutOperationStats(1));
}
// Necessarily, this test will have to be altered when we add tracking of
// other message types as well.
void
-PendingMessageTrackerCallback_Test::onlyPutMessagesAreTracked()
+PendingMessageTrackerTest::onlyPutMessagesAreTracked()
{
Fixture fixture;
auto remove = fixture.sendRemove(
- RequestBuilder().atTime(ms(1000)).toNode(0));
- fixture.sendRemoveReply(*remove, RequestBuilder().atTime(ms(2000)));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0),
+ RequestBuilder().atTime(1000ms).toNode(0));
+ fixture.sendRemoveReply(*remove, RequestBuilder().atTime(2000ms));
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::totalPutLatencyIsAggregatedAcrossRequests()
+PendingMessageTrackerTest::totalPutLatencyIsAggregatedAcrossRequests()
{
Fixture fixture;
// Model 2 concurrent puts to node 0.
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(0, ms(600));
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(1100), 2),
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(0, 600ms);
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(1100ms, 2),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::clearingMessagesDoesNotAffectStats()
+PendingMessageTrackerTest::clearingMessagesDoesNotAffectStats()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(2, ms(2000));
+ fixture.sendPutAndReplyWithLatency(2, 2000ms);
fixture.tracker().clearMessagesForNode(2);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(2000), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(2000ms, 1),
fixture.getNodePutOperationStats(2));
}
void
-PendingMessageTrackerCallback_Test::timeTravellingClockLatenciesNotRegistered()
+PendingMessageTrackerTest::timeTravellingClockLatenciesNotRegistered()
{
Fixture fixture;
- auto put = fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0));
- fixture.sendPutReply(*put, RequestBuilder().atTime(ms(999)));
+ auto put = fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0));
+ fixture.sendPutReply(*put, RequestBuilder().atTime(999ms));
// Latency increase of zero, but we do count the request itself.
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 1),
fixture.getNodePutOperationStats(0));
}
void
-PendingMessageTrackerCallback_Test::statsSnapshotIncludesAllNodes()
+PendingMessageTrackerTest::statsSnapshotIncludesAllNodes()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
- fixture.sendPutAndReplyWithLatency(1, ms(600));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
+ fixture.sendPutAndReplyWithLatency(1, 600ms);
NodeStatsSnapshot snapshot = fixture.tracker().getLatencyStatistics();
CPPUNIT_ASSERT_EQUAL(size_t(2), snapshot.nodeToStats.size());
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
snapshot.nodeToStats[0].puts);
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1),
snapshot.nodeToStats[1].puts);
}
void
-PendingMessageTrackerCallback_Test::latencyProviderForwardsToImplementation()
+PendingMessageTrackerTest::latencyProviderForwardsToImplementation()
{
Fixture fixture;
- fixture.sendPutAndReplyWithLatency(0, ms(500));
+ fixture.sendPutAndReplyWithLatency(0, 500ms);
LatencyStatisticsProvider& provider(
fixture.tracker().getLatencyStatisticsProvider());
NodeStatsSnapshot snapshot = provider.getLatencyStatistics();
CPPUNIT_ASSERT_EQUAL(size_t(1), snapshot.nodeToStats.size());
- CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1),
+ CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1),
snapshot.nodeToStats[0].puts);
}
+// TODO don't set busy for visitor replies? These will mark the node as busy today,
+// but have the same actual semantics as busy merges (i.e. "queue is full", not "node
+// is too busy to accept new requests in general").
+
+void PendingMessageTrackerTest::busy_reply_marks_node_as_busy() {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0));
+ f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
+ CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0));
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(1));
+}
+
+void PendingMessageTrackerTest::busy_node_duration_can_be_adjusted() {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ f.tracker().setNodeBusyDuration(std::chrono::seconds(10));
+ f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY));
+ CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0));
+ f.clock().addSecondsToTime(11);
+ CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0));
+}
+
} // distributor
} // storage