diff options
Diffstat (limited to 'storage/src/tests/distributor/pendingmessagetrackertest.cpp')
-rw-r--r-- | storage/src/tests/distributor/pendingmessagetrackertest.cpp | 139 |
1 files changed, 83 insertions, 56 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index cda9a9e3782..fa36b4e6305 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -13,12 +13,10 @@ namespace storage { namespace distributor { -// Workaround typedef for not (yet) running with --std=c++14 which supports -// user defined literals. Once we do, replace ms(123) with 123ms. -using ms = std::chrono::milliseconds; +using namespace std::chrono_literals; -class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture { - CPPUNIT_TEST_SUITE(PendingMessageTrackerCallback_Test); +class PendingMessageTrackerTest : public CppUnit::TestFixture { + CPPUNIT_TEST_SUITE(PendingMessageTrackerTest); CPPUNIT_TEST(testSimple); CPPUNIT_TEST(testMultipleMessages); CPPUNIT_TEST(testStartPage); @@ -36,6 +34,8 @@ class PendingMessageTrackerCallback_Test : public CppUnit::TestFixture { CPPUNIT_TEST(timeTravellingClockLatenciesNotRegistered); CPPUNIT_TEST(statsSnapshotIncludesAllNodes); CPPUNIT_TEST(latencyProviderForwardsToImplementation); + CPPUNIT_TEST(busy_reply_marks_node_as_busy); + CPPUNIT_TEST(busy_node_duration_can_be_adjusted); CPPUNIT_TEST_SUITE_END(); public: @@ -56,6 +56,8 @@ public: void timeTravellingClockLatenciesNotRegistered(); void statsSnapshotIncludesAllNodes(); void latencyProviderForwardsToImplementation(); + void busy_reply_marks_node_as_busy(); + void busy_node_duration_can_be_adjusted(); private: void insertMessages(PendingMessageTracker& tracker); @@ -122,10 +124,12 @@ public: } void sendPutReply(api::PutCommand& putCmd, - const RequestBuilder& builder) + const RequestBuilder& builder, + const api::ReturnCode& result = api::ReturnCode()) { assignMockedTime(builder.atTime()); auto putReply = putCmd.makeReply(); + putReply->setResult(result); _tracker->reply(*putReply); } @@ -149,8 +153,8 @@ public: void sendPutAndReplyWithLatency(uint16_t node, std::chrono::milliseconds latency) { - auto put = sendPut(RequestBuilder().atTime(ms(1000)).toNode(node)); - sendPutReply(*put, RequestBuilder().atTime(ms(1000) + latency)); + auto put = sendPut(RequestBuilder().atTime(1000ms).toNode(node)); + sendPutReply(*put, RequestBuilder().atTime(1000ms + latency)); } OperationStats getNodePutOperationStats(uint16_t node) { @@ -158,6 +162,7 @@ public: } PendingMessageTracker& tracker() { return *_tracker; } + auto& clock() { return _clock; } private: std::string createDummyIdString(const document::BucketId& bucket) const { @@ -215,17 +220,16 @@ Fixture::Fixture() _clock.setAbsoluteTimeInSeconds(1); // Have to set clock in compReg before constructing tracker, or it'll // flip out and die on an explicit nullptr check. - _tracker = std::unique_ptr<PendingMessageTracker>( - new PendingMessageTracker(_compReg)); + _tracker = std::make_unique<PendingMessageTracker>(_compReg); } Fixture::~Fixture() {} } -CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerCallback_Test); +CPPUNIT_TEST_SUITE_REGISTRATION(PendingMessageTrackerTest); void -PendingMessageTrackerCallback_Test::testSimple() +PendingMessageTrackerTest::testSimple() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -269,7 +273,7 @@ PendingMessageTrackerCallback_Test::testSimple() } void -PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracker) +PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) { for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; @@ -294,7 +298,7 @@ PendingMessageTrackerCallback_Test::insertMessages(PendingMessageTracker& tracke } void -PendingMessageTrackerCallback_Test::testStartPage() +PendingMessageTrackerTest::testStartPage() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -318,7 +322,7 @@ PendingMessageTrackerCallback_Test::testStartPage() } void -PendingMessageTrackerCallback_Test::testMultipleMessages() +PendingMessageTrackerTest::testMultipleMessages() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -419,7 +423,7 @@ public: } void -PendingMessageTrackerCallback_Test::testGetPendingMessageTypes() +PendingMessageTrackerTest::testGetPendingMessageTypes() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -456,7 +460,7 @@ PendingMessageTrackerCallback_Test::testGetPendingMessageTypes() } void -PendingMessageTrackerCallback_Test::testHasPendingMessage() +PendingMessageTrackerTest::testHasPendingMessage() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -508,7 +512,7 @@ public: } // anon ns void -PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket() +PendingMessageTrackerTest::testGetAllMessagesForSingleBucket() { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; @@ -535,10 +539,10 @@ PendingMessageTrackerCallback_Test::testGetAllMessagesForSingleBucket() } void -PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed() +PendingMessageTrackerTest::nodeStatsCanBeOutputStreamed() { NodeStats stats; - stats.puts = makeOpStats(ms(56789), 10); + stats.puts = makeOpStats(56789ms, 10); std::ostringstream os; os << stats; @@ -550,120 +554,143 @@ PendingMessageTrackerCallback_Test::nodeStatsCanBeOutputStreamed() } void -PendingMessageTrackerCallback_Test::totalPutLatencyIsInitiallyZero() +PendingMessageTrackerTest::totalPutLatencyIsInitiallyZero() { Fixture fixture; - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0), + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::statsNotAlteredBeforeReplyReceived() +PendingMessageTrackerTest::statsNotAlteredBeforeReplyReceived() { Fixture fixture; - fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0), + fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0)); + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::totalPutLatencyIsTrackedForSingleRequest() +PendingMessageTrackerTest::totalPutLatencyIsTrackedForSingleRequest() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); + fixture.sendPutAndReplyWithLatency(0, 500ms); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::statsAreTrackedSeparatelyPerNode() +PendingMessageTrackerTest::statsAreTrackedSeparatelyPerNode() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); - fixture.sendPutAndReplyWithLatency(1, ms(600)); + fixture.sendPutAndReplyWithLatency(0, 500ms); + fixture.sendPutAndReplyWithLatency(1, 600ms); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), fixture.getNodePutOperationStats(0)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1), fixture.getNodePutOperationStats(1)); } // Necessarily, this test will have to be altered when we add tracking of // other message types as well. void -PendingMessageTrackerCallback_Test::onlyPutMessagesAreTracked() +PendingMessageTrackerTest::onlyPutMessagesAreTracked() { Fixture fixture; auto remove = fixture.sendRemove( - RequestBuilder().atTime(ms(1000)).toNode(0)); - fixture.sendRemoveReply(*remove, RequestBuilder().atTime(ms(2000))); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 0), + RequestBuilder().atTime(1000ms).toNode(0)); + fixture.sendRemoveReply(*remove, RequestBuilder().atTime(2000ms)); + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 0), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::totalPutLatencyIsAggregatedAcrossRequests() +PendingMessageTrackerTest::totalPutLatencyIsAggregatedAcrossRequests() { Fixture fixture; // Model 2 concurrent puts to node 0. - fixture.sendPutAndReplyWithLatency(0, ms(500)); - fixture.sendPutAndReplyWithLatency(0, ms(600)); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(1100), 2), + fixture.sendPutAndReplyWithLatency(0, 500ms); + fixture.sendPutAndReplyWithLatency(0, 600ms); + CPPUNIT_ASSERT_EQUAL(makeOpStats(1100ms, 2), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::clearingMessagesDoesNotAffectStats() +PendingMessageTrackerTest::clearingMessagesDoesNotAffectStats() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(2, ms(2000)); + fixture.sendPutAndReplyWithLatency(2, 2000ms); fixture.tracker().clearMessagesForNode(2); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(2000), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(2000ms, 1), fixture.getNodePutOperationStats(2)); } void -PendingMessageTrackerCallback_Test::timeTravellingClockLatenciesNotRegistered() +PendingMessageTrackerTest::timeTravellingClockLatenciesNotRegistered() { Fixture fixture; - auto put = fixture.sendPut(RequestBuilder().atTime(ms(1000)).toNode(0)); - fixture.sendPutReply(*put, RequestBuilder().atTime(ms(999))); + auto put = fixture.sendPut(RequestBuilder().atTime(1000ms).toNode(0)); + fixture.sendPutReply(*put, RequestBuilder().atTime(999ms)); // Latency increase of zero, but we do count the request itself. - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(0), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(0ms, 1), fixture.getNodePutOperationStats(0)); } void -PendingMessageTrackerCallback_Test::statsSnapshotIncludesAllNodes() +PendingMessageTrackerTest::statsSnapshotIncludesAllNodes() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); - fixture.sendPutAndReplyWithLatency(1, ms(600)); + fixture.sendPutAndReplyWithLatency(0, 500ms); + fixture.sendPutAndReplyWithLatency(1, 600ms); NodeStatsSnapshot snapshot = fixture.tracker().getLatencyStatistics(); CPPUNIT_ASSERT_EQUAL(size_t(2), snapshot.nodeToStats.size()); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), snapshot.nodeToStats[0].puts); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(600), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(600ms, 1), snapshot.nodeToStats[1].puts); } void -PendingMessageTrackerCallback_Test::latencyProviderForwardsToImplementation() +PendingMessageTrackerTest::latencyProviderForwardsToImplementation() { Fixture fixture; - fixture.sendPutAndReplyWithLatency(0, ms(500)); + fixture.sendPutAndReplyWithLatency(0, 500ms); LatencyStatisticsProvider& provider( fixture.tracker().getLatencyStatisticsProvider()); NodeStatsSnapshot snapshot = provider.getLatencyStatistics(); CPPUNIT_ASSERT_EQUAL(size_t(1), snapshot.nodeToStats.size()); - CPPUNIT_ASSERT_EQUAL(makeOpStats(ms(500), 1), + CPPUNIT_ASSERT_EQUAL(makeOpStats(500ms, 1), snapshot.nodeToStats[0].puts); } +// TODO don't set busy for visitor replies? These will mark the node as busy today, +// but have the same actual semantics as busy merges (i.e. "queue is full", not "node +// is too busy to accept new requests in general"). + +void PendingMessageTrackerTest::busy_reply_marks_node_as_busy() { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0)); + f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); + CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0)); + CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(1)); +} + +void PendingMessageTrackerTest::busy_node_duration_can_be_adjusted() { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + f.tracker().setNodeBusyDuration(std::chrono::seconds(10)); + f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); + CPPUNIT_ASSERT(f.tracker().getNodeInfo().isBusy(0)); + f.clock().addSecondsToTime(11); + CPPUNIT_ASSERT(!f.tracker().getNodeInfo().isBusy(0)); +} + } // distributor } // storage |