summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2018-02-14 17:58:12 +0100
committerGitHub <noreply@github.com>2018-02-14 17:58:12 +0100
commit374122e84540bfe867437bde39e890db349d0221 (patch)
tree9c121b584d0e5898906372b2c6af1e7a00489933
parentfcb0a22a4ad2c57252bd93efc59287a9ef810644 (diff)
parent53571d9be1b7073dd3e135d7878b37ff0f008011 (diff)
Merge pull request #5040 from vespa-engine/toregge/cleanup-bucketdbupdater-test
Add messageCount() method to help calculate number of expected messages.
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp86
1 files changed, 45 insertions, 41 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 92495b37494..77b661c2606 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -200,6 +200,10 @@ protected:
std::vector<document::BucketSpace> _bucketSpaces;
+ size_t messageCount(size_t messagesPerBucketSpace) const {
+ return messagesPerBucketSpace * _bucketSpaces.size();
+ }
+
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
void setUp() override {
@@ -416,7 +420,7 @@ public:
setSystemState(newState);
- for (uint32_t i=0; i<numStorageNodes * _bucketSpaces.size(); i++) {
+ for (uint32_t i=0; i< messageCount(numStorageNodes); i++) {
CPPUNIT_ASSERT(_sender.commands[i]->getType() ==
MessageType::REQUESTBUCKETINFO);
@@ -434,7 +438,7 @@ public:
"distributor:1 storage:%d", numStorageNodes));
lib::ClusterState newState(state);
- for (uint32_t i=0; i<numStorageNodes * _bucketSpaces.size(); i++) {
+ for (uint32_t i=0; i< messageCount(numStorageNodes); i++) {
fakeBucketReply(newState, *_sender.commands[i], numBuckets);
}
assertCorrectBuckets(numBuckets, state);
@@ -622,7 +626,7 @@ BucketDBUpdaterTest::testNormalUsage()
{
setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
// Ensure distribution hash is set correctly
CPPUNIT_ASSERT_EQUAL(
@@ -640,12 +644,12 @@ BucketDBUpdaterTest::testNormalUsage()
// change is only implemented after completion of previous cluster state
setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
// Expect reply of first set SystemState request.
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- 3 * _bucketSpaces.size(), 10);
+ messageCount(3), 10);
assertCorrectBuckets(10, "distributor:2 storage:3");
}
@@ -656,9 +660,9 @@ BucketDBUpdaterTest::testDistributorChange()
// First sends request
setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- 3 * _bucketSpaces.size(), numBuckets);
+ messageCount(3), numBuckets);
_sender.clear();
// No change from initializing to up (when done with last job)
@@ -674,9 +678,9 @@ BucketDBUpdaterTest::testDistributorChange()
// Removing distributor. Need to refetch new data from all nodes.
setSystemState(lib::ClusterState("distributor:2 storage:3"));
- CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"),
- 3 * _bucketSpaces.size(), numBuckets);
+ messageCount(3), numBuckets);
_sender.clear();
assertCorrectBuckets(numBuckets, "distributor:2 storage:3");
}
@@ -689,9 +693,9 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping()
int numBuckets = 100;
setSystemState(lib::ClusterState("distributor:6 storage:6"));
- CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"),
- 6 * _bucketSpaces.size(), numBuckets);
+ messageCount(6), numBuckets);
_sender.clear();
// Distributor going down in other group, no change
@@ -711,7 +715,7 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping()
// Changed grouping cause change
setDistribution(getDistConfig6Nodes4Groups());
- CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
}
void
@@ -778,7 +782,7 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo()
}
// Should be resent.
- CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(2 * _bucketSpaces.size()),
+ CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(messageCount(2)),
_sender.getCommands());
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
@@ -947,12 +951,12 @@ BucketDBUpdaterTest::testInitializingWhileRecheck()
lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1");
setSystemState(systemState);
- CPPUNIT_ASSERT_EQUAL(2 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(2), _sender.commands.size());
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
- for (uint32_t i = 0; i < 2 * _bucketSpaces.size(); ++i) {
+ for (uint32_t i = 0; i < messageCount(2); ++i) {
fakeBucketReply(systemState, *_sender.commands[i], 100);
}
@@ -1714,7 +1718,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
api::Timestamp(1)));
- CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(3), sender.commands.size());
sortSentMessagesByIndex(sender);
@@ -2030,7 +2034,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
lib::ClusterState stateAfter("distributor:3 storage:3");
{
- uint32_t expectedMsgs = 2 * _bucketSpaces.size(), dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = messageCount(2), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn);
}
CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent()
@@ -2084,24 +2088,24 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP
{
lib::ClusterState stateBefore("distributor:6 storage:6");
{
- uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
}
_sender.clear();
std::string distConfig(getDistConfig6Nodes3Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
- CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
// Suddenly, a wild cluster state change appears! Even though this state
// does not in itself imply any bucket changes, it will still overwrite the
// pending cluster state and thus its state of pending bucket info requests.
setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6"));
- CPPUNIT_ASSERT_EQUAL(12 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(12), _sender.commands.size());
- // Send replies for first 6 * _bucketSpaces.size() (outdated requests).
+ // Send replies for first messageCount(6) (outdated requests).
int numBuckets = 10;
- for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) {
+ for (uint32_t i = 0; i < messageCount(6); ++i) {
fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
*_sender.commands[i], numBuckets);
}
@@ -2109,9 +2113,9 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP
assertCorrectBuckets(1, "distributor:6 storage:6");
// Send for current pending.
- for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) {
+ for (uint32_t i = 0; i < messageCount(6); ++i) {
fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
- *_sender.commands[i + 6 * _bucketSpaces.size()],
+ *_sender.commands[i + messageCount(6)],
numBuckets);
}
assertCorrectBuckets(numBuckets, "distributor:6 storage:6");
@@ -2125,7 +2129,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP
void
BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode()
{
- setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6 * _bucketSpaces.size(), 20);
+ setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20);
_sender.clear();
// First cluster state; implicit scan of all buckets which does not
// use normal recovery mode ticking-path.
@@ -2137,9 +2141,9 @@ BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode()
// No replies received yet, still no recovery mode.
CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
- CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
uint32_t numBuckets = 10;
- for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) {
+ for (uint32_t i = 0; i < messageCount(6); ++i) {
fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
*_sender.commands[i], numBuckets);
}
@@ -2268,7 +2272,7 @@ BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch()
CPPUNIT_ASSERT_EQUAL(
expandNodeVec({0, 1, 2, 3, 4, 5}),
getSentNodesWithPreemption("version:1 distributor:6 storage:6",
- 6 * _bucketSpaces.size(),
+ messageCount(6),
"version:2 distributor:6 .5.s:d storage:6",
"version:3 distributor:6 storage:6"));
}
@@ -2280,7 +2284,7 @@ BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch()
expandNodeVec({2, 3}),
getSentNodesWithPreemption(
"version:1 distributor:6 storage:6 .2.s:d",
- 5 * _bucketSpaces.size(),
+ messageCount(5),
"version:2 distributor:6 storage:6 .2.s:d .3.s:d",
"version:3 distributor:6 storage:6"));
}
@@ -2292,7 +2296,7 @@ BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched()
expandNodeVec({2}),
getSentNodesWithPreemption(
"version:1 distributor:6 storage:6",
- 6 * _bucketSpaces.size(),
+ messageCount(6),
"version:2 distributor:6 storage:6 .2.s:d",
"version:3 distributor:6 storage:6"));
}
@@ -2304,7 +2308,7 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState()
nodeVec{},
getSentNodesWithPreemption(
"version:1 distributor:6 storage:6 .2.s:d",
- 5 * _bucketSpaces.size(),
+ messageCount(5),
"version:2 distributor:6 storage:6", // Sends to 2.
"version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
}
@@ -2318,7 +2322,7 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState()
expandNodeVec({0, 1, 2, 3, 4, 5}),
getSentNodesWithPreemption(
"version:1 distributor:6 storage:100",
- 100 * _bucketSpaces.size(),
+ messageCount(100),
"version:2 distributor:5 .4.s:d storage:100",
"version:3 distributor:6 storage:6"));
}
@@ -2328,7 +2332,7 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion()
{
lib::ClusterState stateBefore(
"version:1 distributor:6 storage:6 .1.t:1234");
- uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 10;
+ uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 10;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
_sender.clear();
// New cluster state that should not by itself trigger any new fetches,
@@ -2383,7 +2387,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch()
void
BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
{
- uint32_t expectedMsgs = 3 * _bucketSpaces.size(), dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = messageCount(3), dummyBucketsToReturn = 1;
setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"),
expectedMsgs, dummyBucketsToReturn);
_sender.clear();
@@ -2411,7 +2415,7 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
// Attempt to apply state with {0, 1} set. This will compare the new state
// with the previous state, which still has node 2.
- expectedMsgs = 2 * _bucketSpaces.size();
+ expectedMsgs = messageCount(2);
setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"),
expectedMsgs, dummyBucketsToReturn);
@@ -2453,7 +2457,7 @@ BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer()
void
BucketDBUpdaterTest::transition_time_tracked_for_single_state_change()
{
- completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size());
+ completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2));
CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis());
}
@@ -2461,8 +2465,8 @@ BucketDBUpdaterTest::transition_time_tracked_for_single_state_change()
void
BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes()
{
- completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size());
- completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1 * _bucketSpaces.size());
+ completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2));
+ completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1));
CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis());
}
@@ -2471,13 +2475,13 @@ void
BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change()
{
lib::ClusterState state("distributor:2 storage:2");
- setAndEnableClusterState(state, 2 * _bucketSpaces.size(), 1);
+ setAndEnableClusterState(state, messageCount(2), 1);
_sender.clear();
std::string distConfig(getDistConfig3Nodes1Group());
setDistribution(distConfig);
getClock().addSecondsToTime(4);
- completeBucketInfoGathering(state, 2 * _bucketSpaces.size());
+ completeBucketInfoGathering(state, messageCount(2));
CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis());
}
@@ -2491,7 +2495,7 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions()
// Pre-empted with new state here, which will push out the old pending
// state and replace it with a new one. We should still count the time
// used processing the old state.
- completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3 * _bucketSpaces.size());
+ completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3));
CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis());
}