diff options
author | Tor Egge <Tor.Egge@oath.com> | 2018-02-14 14:44:28 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2018-02-14 14:44:28 +0000 |
commit | 53571d9be1b7073dd3e135d7878b37ff0f008011 (patch) | |
tree | a746cc0250b7bde6ad86aa17c915942a3db0e61d /storage/src | |
parent | 48cbaea4364894bae522ba8bff89305e2798fe4a (diff) |
Add messageCount() method to help calculate number of expected messages.
Diffstat (limited to 'storage/src')
-rw-r--r-- | storage/src/tests/distributor/bucketdbupdatertest.cpp | 86 |
1 files changed, 45 insertions, 41 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 92495b37494..77b661c2606 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -200,6 +200,10 @@ protected: std::vector<document::BucketSpace> _bucketSpaces; + size_t messageCount(size_t messagesPerBucketSpace) const { + return messagesPerBucketSpace * _bucketSpaces.size(); + } + public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { @@ -416,7 +420,7 @@ public: setSystemState(newState); - for (uint32_t i=0; i<numStorageNodes * _bucketSpaces.size(); i++) { + for (uint32_t i=0; i< messageCount(numStorageNodes); i++) { CPPUNIT_ASSERT(_sender.commands[i]->getType() == MessageType::REQUESTBUCKETINFO); @@ -434,7 +438,7 @@ public: "distributor:1 storage:%d", numStorageNodes)); lib::ClusterState newState(state); - for (uint32_t i=0; i<numStorageNodes * _bucketSpaces.size(); i++) { + for (uint32_t i=0; i< messageCount(numStorageNodes); i++) { fakeBucketReply(newState, *_sender.commands[i], numBuckets); } assertCorrectBuckets(numBuckets, state); @@ -622,7 +626,7 @@ BucketDBUpdaterTest::testNormalUsage() { setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); // Ensure distribution hash is set correctly CPPUNIT_ASSERT_EQUAL( @@ -640,12 +644,12 @@ BucketDBUpdaterTest::testNormalUsage() // change is only implemented after completion of previous cluster state setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); // Expect reply of first set SystemState request. CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - 3 * _bucketSpaces.size(), 10); + messageCount(3), 10); assertCorrectBuckets(10, "distributor:2 storage:3"); } @@ -656,9 +660,9 @@ BucketDBUpdaterTest::testDistributorChange() // First sends request setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - 3 * _bucketSpaces.size(), numBuckets); + messageCount(3), numBuckets); _sender.clear(); // No change from initializing to up (when done with last job) @@ -674,9 +678,9 @@ BucketDBUpdaterTest::testDistributorChange() // Removing distributor. Need to refetch new data from all nodes. setSystemState(lib::ClusterState("distributor:2 storage:3")); - CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"), - 3 * _bucketSpaces.size(), numBuckets); + messageCount(3), numBuckets); _sender.clear(); assertCorrectBuckets(numBuckets, "distributor:2 storage:3"); } @@ -689,9 +693,9 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping() int numBuckets = 100; setSystemState(lib::ClusterState("distributor:6 storage:6")); - CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"), - 6 * _bucketSpaces.size(), numBuckets); + messageCount(6), numBuckets); _sender.clear(); // Distributor going down in other group, no change @@ -711,7 +715,7 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping() // Changed grouping cause change setDistribution(getDistConfig6Nodes4Groups()); - CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); } void @@ -778,7 +782,7 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo() } // Should be resent. - CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(2 * _bucketSpaces.size()), + CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(messageCount(2)), _sender.getCommands()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); @@ -947,12 +951,12 @@ BucketDBUpdaterTest::testInitializingWhileRecheck() lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); setSystemState(systemState); - CPPUNIT_ASSERT_EQUAL(2 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(2), _sender.commands.size()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - for (uint32_t i = 0; i < 2 * _bucketSpaces.size(); ++i) { + for (uint32_t i = 0; i < messageCount(2); ++i) { fakeBucketReply(systemState, *_sender.commands[i], 100); } @@ -1714,7 +1718,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); - CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(3), sender.commands.size()); sortSentMessagesByIndex(sender); @@ -2030,7 +2034,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() lib::ClusterState stateAfter("distributor:3 storage:3"); { - uint32_t expectedMsgs = 2 * _bucketSpaces.size(), dummyBucketsToReturn = 1; + uint32_t expectedMsgs = messageCount(2), dummyBucketsToReturn = 1; setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn); } CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent() @@ -2084,24 +2088,24 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP { lib::ClusterState stateBefore("distributor:6 storage:6"); { - uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 1; + uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); std::string distConfig(getDistConfig6Nodes3Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); - CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); // Suddenly, a wild cluster state change appears! Even though this state // does not in itself imply any bucket changes, it will still overwrite the // pending cluster state and thus its state of pending bucket info requests. setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6")); - CPPUNIT_ASSERT_EQUAL(12 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(12), _sender.commands.size()); - // Send replies for first 6 * _bucketSpaces.size() (outdated requests). + // Send replies for first messageCount(6) (outdated requests). int numBuckets = 10; - for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + for (uint32_t i = 0; i < messageCount(6); ++i) { fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), *_sender.commands[i], numBuckets); } @@ -2109,9 +2113,9 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP assertCorrectBuckets(1, "distributor:6 storage:6"); // Send for current pending. - for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + for (uint32_t i = 0; i < messageCount(6); ++i) { fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), - *_sender.commands[i + 6 * _bucketSpaces.size()], + *_sender.commands[i + messageCount(6)], numBuckets); } assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); @@ -2125,7 +2129,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP void BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() { - setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6 * _bucketSpaces.size(), 20); + setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20); _sender.clear(); // First cluster state; implicit scan of all buckets which does not // use normal recovery mode ticking-path. @@ -2137,9 +2141,9 @@ BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() // No replies received yet, still no recovery mode. CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); - CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); uint32_t numBuckets = 10; - for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + for (uint32_t i = 0; i < messageCount(6); ++i) { fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), *_sender.commands[i], numBuckets); } @@ -2268,7 +2272,7 @@ BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch() CPPUNIT_ASSERT_EQUAL( expandNodeVec({0, 1, 2, 3, 4, 5}), getSentNodesWithPreemption("version:1 distributor:6 storage:6", - 6 * _bucketSpaces.size(), + messageCount(6), "version:2 distributor:6 .5.s:d storage:6", "version:3 distributor:6 storage:6")); } @@ -2280,7 +2284,7 @@ BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch() expandNodeVec({2, 3}), getSentNodesWithPreemption( "version:1 distributor:6 storage:6 .2.s:d", - 5 * _bucketSpaces.size(), + messageCount(5), "version:2 distributor:6 storage:6 .2.s:d .3.s:d", "version:3 distributor:6 storage:6")); } @@ -2292,7 +2296,7 @@ BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched() expandNodeVec({2}), getSentNodesWithPreemption( "version:1 distributor:6 storage:6", - 6 * _bucketSpaces.size(), + messageCount(6), "version:2 distributor:6 storage:6 .2.s:d", "version:3 distributor:6 storage:6")); } @@ -2304,7 +2308,7 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState() nodeVec{}, getSentNodesWithPreemption( "version:1 distributor:6 storage:6 .2.s:d", - 5 * _bucketSpaces.size(), + messageCount(5), "version:2 distributor:6 storage:6", // Sends to 2. "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. } @@ -2318,7 +2322,7 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState() expandNodeVec({0, 1, 2, 3, 4, 5}), getSentNodesWithPreemption( "version:1 distributor:6 storage:100", - 100 * _bucketSpaces.size(), + messageCount(100), "version:2 distributor:5 .4.s:d storage:100", "version:3 distributor:6 storage:6")); } @@ -2328,7 +2332,7 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion() { lib::ClusterState stateBefore( "version:1 distributor:6 storage:6 .1.t:1234"); - uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 10; + uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 10; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); _sender.clear(); // New cluster state that should not by itself trigger any new fetches, @@ -2383,7 +2387,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch() void BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() { - uint32_t expectedMsgs = 3 * _bucketSpaces.size(), dummyBucketsToReturn = 1; + uint32_t expectedMsgs = messageCount(3), dummyBucketsToReturn = 1; setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), expectedMsgs, dummyBucketsToReturn); _sender.clear(); @@ -2411,7 +2415,7 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() // Attempt to apply state with {0, 1} set. This will compare the new state // with the previous state, which still has node 2. - expectedMsgs = 2 * _bucketSpaces.size(); + expectedMsgs = messageCount(2); setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), expectedMsgs, dummyBucketsToReturn); @@ -2453,7 +2457,7 @@ BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer() void BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() { - completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size()); + completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)); CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis()); } @@ -2461,8 +2465,8 @@ BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() void BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes() { - completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size()); - completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1 * _bucketSpaces.size()); + completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)); + completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1)); CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis()); } @@ -2471,13 +2475,13 @@ void BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change() { lib::ClusterState state("distributor:2 storage:2"); - setAndEnableClusterState(state, 2 * _bucketSpaces.size(), 1); + setAndEnableClusterState(state, messageCount(2), 1); _sender.clear(); std::string distConfig(getDistConfig3Nodes1Group()); setDistribution(distConfig); getClock().addSecondsToTime(4); - completeBucketInfoGathering(state, 2 * _bucketSpaces.size()); + completeBucketInfoGathering(state, messageCount(2)); CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis()); } @@ -2491,7 +2495,7 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions() // Pre-empted with new state here, which will push out the old pending // state and replace it with a new one. We should still count the time // used processing the old state. - completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3 * _bucketSpaces.size()); + completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3)); CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis()); } |