From ab30a107007f91bd207980b670356b8acc50ad63 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 14 Feb 2018 09:51:22 +0000 Subject: Adjust distributor unit tests to handle global distributor bucket space. --- .../src/tests/distributor/bucketdbupdatertest.cpp | 516 +++++++++++---------- 1 file changed, 278 insertions(+), 238 deletions(-) (limited to 'storage/src/tests/distributor/bucketdbupdatertest.cpp') diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index b79894aee9a..2db7be19712 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -13,18 +13,45 @@ #include #include #include +#include #include #include #include #include +#include using namespace storage::api; using namespace storage::lib; using document::test::makeDocumentBucket; using document::test::makeBucketSpace; +using document::BucketSpace; +using document::FixedBucketSpaces; namespace storage::distributor { +namespace { + +std::string +getStringList(std::string s, uint32_t count) +{ + std::ostringstream ost; + for (uint32_t i = 0; i < count; ++i) { + if (i > 0) { + ost << ","; + } + ost << s; + } + return ost.str(); +} + +std::string +getRequestBucketInfoStrings(uint32_t count) +{ + return getStringList("Request bucket info", count); +} + +} + class BucketDBUpdaterTest : public CppUnit::TestFixture, public DistributorTestUtil { @@ -87,6 +114,9 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); CPPUNIT_TEST_SUITE_END(); +public: + BucketDBUpdaterTest(); + protected: void testNormalUsage(); void testDistributorChange(); @@ -161,10 +191,20 @@ protected: return clusterInfo; } + static std::string getNodeList(std::vector nodes, size_t count); + + std::string getNodeList(std::vector nodes); + + std::vector + expandNodeVec(const std::vector &nodes); + + std::vector _bucketSpaces; + public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { createLinks(); + _bucketSpaces = getBucketSpaces(); }; void tearDown() override { @@ -173,17 +213,17 @@ public: std::shared_ptr getFakeBucketReply( const lib::ClusterState& state, - RequestBucketInfoCommand& cmd, + const RequestBucketInfoCommand& cmd, int storageIndex, - int bucketCount, - int invalidBucketCount = 0) + uint32_t bucketCount, + uint32_t invalidBucketCount = 0) { RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd); sreply->setAddress(storageAddress(storageIndex)); api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); - for (int i=0; i(sreply); } - void fakeBucketReply( - const lib::ClusterState& state, - RequestBucketInfoCommand& cmd, - int storageIndex, - int bucketCount, - int invalidBucketCount = 0) + void fakeBucketReply(const lib::ClusterState &state, + const api::StorageCommand &cmd, + uint32_t bucketCount, + uint32_t invalidBucketCount = 0) { + CPPUNIT_ASSERT(cmd.getType() == MessageType::REQUESTBUCKETINFO); + const api::StorageMessageAddress &address(*cmd.getAddress()); getBucketDBUpdater().onRequestBucketInfoReply( getFakeBucketReply(state, - cmd, - storageIndex, + dynamic_cast(cmd), + address.getIndex(), bucketCount, invalidBucketCount)); } @@ -332,22 +372,15 @@ public: } void completeBucketInfoGathering(const lib::ClusterState& state, - uint32_t expectedMsgs, - uint32_t nBuckets = 1) + size_t expectedMsgs, + uint32_t bucketCount = 1, + uint32_t invalidBucketCount = 0) { - CPPUNIT_ASSERT_EQUAL(size_t(expectedMsgs), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(expectedMsgs, _sender.commands.size()); for (uint32_t i = 0; i < _sender.commands.size(); i++) { - CPPUNIT_ASSERT(_sender.commands[i]->getType() == - MessageType::REQUESTBUCKETINFO); - - const api::StorageMessageAddress& address( - *_sender.commands[i]->getAddress()); - fakeBucketReply( - state, - dynamic_cast(*_sender.commands[i]), - address.getIndex(), - nBuckets); + fakeBucketReply(state, *_sender.commands[i], + bucketCount, invalidBucketCount); } } @@ -383,12 +416,12 @@ public: setSystemState(newState); - for (uint32_t i=0; igetType() == MessageType::REQUESTBUCKETINFO); const api::StorageMessageAddress *address = _sender.commands[i]->getAddress(); - CPPUNIT_ASSERT_EQUAL(i, (uint32_t)address->getIndex()); + CPPUNIT_ASSERT_EQUAL((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex()); } } @@ -401,11 +434,8 @@ public: "distributor:1 storage:%d", numStorageNodes)); lib::ClusterState newState(state); - for (uint32_t i=0; i( *_sender.commands[0]).getDistributionHash()); - fakeBucketReply( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - dynamic_cast(*_sender.commands[0]), - 0, 10); + fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + *_sender.commands[0], 10); _sender.clear(); @@ -605,17 +640,12 @@ BucketDBUpdaterTest::testNormalUsage() // change is only implemented after completion of previous cluster state setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); // Expect reply of first set SystemState request. CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); - for (uint32_t i = 0; i < 3; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - dynamic_cast(*_sender.commands[i]), - i, 10); - } - + completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + 3 * _bucketSpaces.size(), 10); assertCorrectBuckets(10, "distributor:2 storage:3"); } @@ -626,13 +656,9 @@ BucketDBUpdaterTest::testDistributorChange() // First sends request setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); - for (uint32_t i = 0; i < 3; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); - } + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + 3 * _bucketSpaces.size(), numBuckets); _sender.clear(); // No change from initializing to up (when done with last job) @@ -648,13 +674,9 @@ BucketDBUpdaterTest::testDistributorChange() // Removing distributor. Need to refetch new data from all nodes. setSystemState(lib::ClusterState("distributor:2 storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); - for (uint32_t i = 0; i < 3; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:2 storage:3"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); - } + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"), + 3 * _bucketSpaces.size(), numBuckets); _sender.clear(); assertCorrectBuckets(numBuckets, "distributor:2 storage:3"); } @@ -667,13 +689,9 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping() int numBuckets = 100; setSystemState(lib::ClusterState("distributor:6 storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 storage:6"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); - } + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); + completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"), + 6 * _bucketSpaces.size(), numBuckets); _sender.clear(); // Distributor going down in other group, no change @@ -693,7 +711,7 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping() // Changed grouping cause change setDistribution(getDistConfig6Nodes4Groups()); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); } void @@ -701,16 +719,13 @@ BucketDBUpdaterTest::testNormalUsageInitializing() { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i")); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); // Not yet passing on system state. CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 10, - 10); + completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucketSpaces.size(), 10, 10); assertCorrectBuckets(10, "distributor:1 storage:1"); @@ -727,12 +742,10 @@ BucketDBUpdaterTest::testNormalUsageInitializing() setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); // Send a new request bucket info up. - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 20); + completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucketSpaces.size(), 20); // Pass on cluster state and recheck buckets now. CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); @@ -746,33 +759,34 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo() setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); { - std::shared_ptr reply = - getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 10); + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { + std::shared_ptr reply = + getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *((RequestBucketInfoCommand*)_sender.commands[i].get()), + 0, + 10); + reply->setResult(api::ReturnCode::NOT_CONNECTED); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + } - reply->setResult(api::ReturnCode::NOT_CONNECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - // Trigger that delayed message is sent + // Trigger that delayed message is sent getClock().addSecondsToTime(10); getBucketDBUpdater().resendDelayedMessages(); } // Should be resent. - CPPUNIT_ASSERT_EQUAL(std::string("Request bucket info," - "Request bucket info"), + CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(2 * _bucketSpaces.size()), _sender.getCommands()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[1].get()), - 0, - 10); + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *_sender.commands[_bucketSpaces.size() + i], 10); + } for (int i=0; i<10; i++) { CPPUNIT_ASSERT_EQUAL( @@ -791,14 +805,14 @@ BucketDBUpdaterTest::testEncodeErrorHandling() { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); // Not yet passing on system state. CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - { + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { std::shared_ptr reply = getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), + *((RequestBucketInfoCommand*)_sender.commands[i].get()), 0, 10); @@ -815,21 +829,15 @@ BucketDBUpdaterTest::testDownWhileInit() setStorageNodes(3); fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 5); + *_sender.commands[0], 5); setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *((RequestBucketInfoCommand*)_sender.commands[2].get()), - 2, - 5); + *_sender.commands[2], 5); fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *((RequestBucketInfoCommand*)_sender.commands[1].get()), - 1, - 5); + *_sender.commands[1], 5); } bool @@ -844,6 +852,42 @@ BucketDBUpdaterTest::bucketExistsThatHasNode(int bucketCount, uint16_t node) con return false; } +std::string +BucketDBUpdaterTest::getNodeList(std::vector nodes, size_t count) +{ + std::ostringstream ost; + bool first = true; + for (const auto &node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + if (!first) { + ost << ","; + } + ost << node; + first = false; + } + } + return ost.str(); +} + +std::string +BucketDBUpdaterTest::getNodeList(std::vector nodes) +{ + return getNodeList(std::move(nodes), _bucketSpaces.size()); +} + +std::vector +BucketDBUpdaterTest::expandNodeVec(const std::vector &nodes) +{ + std::vector res; + size_t count = _bucketSpaces.size(); + for (const auto &node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + res.push_back(node); + } + } + return res; +} + void BucketDBUpdaterTest::testNodeDown() { @@ -903,16 +947,13 @@ BucketDBUpdaterTest::testInitializingWhileRecheck() lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); setSystemState(systemState); - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(2 * _bucketSpaces.size(), _sender.commands.size()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - for (int i=0; i<2; i++) { - fakeBucketReply(systemState, - *((RequestBucketInfoCommand*)_sender.commands[i].get()), - i, - 100); + for (uint32_t i = 0; i < 2 * _bucketSpaces.size(); ++i) { + fakeBucketReply(systemState, *_sender.commands[i], 100); } // Now we can pass on system state. @@ -931,35 +972,36 @@ BucketDBUpdaterTest::testBitChange() { setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2")); - CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size()); - - CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO); - - RequestBucketInfoReply* sreply = - new RequestBucketInfoReply(*((RequestBucketInfoCommand*)_sender.commands[0].get())); - sreply->setAddress(storageAddress(0)); - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); - + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + const auto &req = dynamic_cast(*_sender.commands[bsi]); + RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req); + sreply->setAddress(storageAddress(0)); + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + int cnt=0; + for (int i=0; cnt < 2; i++) { + lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); + std::vector distributors; + if (distribution.getIdealDistributorNode( + lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), + document::BucketId(16, i)) + == 0) + { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); - int cnt=0; - for (int i=0; cnt < 2; i++) { - lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); - std::vector distributors; - if (distribution.getIdealDistributorNode( - lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), - document::BucketId(16, i)) - == 0) - { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); - - bucketlist.push_back(document::BucketId(16, i)); - cnt++; + bucketlist.push_back(document::BucketId(16, i)); + cnt++; + } + } } - } - getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr(sreply)); + getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr(sreply)); + } } CPPUNIT_ASSERT_EQUAL( @@ -975,29 +1017,31 @@ BucketDBUpdaterTest::testBitChange() _sender.clear(); setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2")); - CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size()); - - CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + + CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + const auto &req = dynamic_cast(*_sender.commands[bsi]); + RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req); + sreply->setAddress(storageAddress(0)); + sreply->setResult(api::ReturnCode::OK); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + + for (uint32_t i = 0; i < 3; ++i) { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + } - RequestBucketInfoReply* sreply = - new RequestBucketInfoReply( - *((RequestBucketInfoCommand*)_sender.commands[0].get())); - sreply->setAddress(storageAddress(0)); - sreply->setResult(api::ReturnCode::OK); - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, 4), + api::BucketInfo(10,1,1))); + } - for (uint32_t i = 0; i < 3; ++i) { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); + getBucketDBUpdater().onRequestBucketInfoReply( + std::shared_ptr(sreply)); } - - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, 4), - api::BucketInfo(10,1,1))); - - getBucketDBUpdater().onRequestBucketInfoReply( - std::shared_ptr(sreply)); } CPPUNIT_ASSERT_EQUAL( @@ -1256,7 +1300,7 @@ void BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() { setSystemState(lib::ClusterState("distributor:1 storage:1")); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); { api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); @@ -1266,18 +1310,16 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() getBucketDBUpdater().onNotifyBucketChange(cmd); } - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); - fakeBucketReply( - lib::ClusterState("distributor:1 storage:1"), - dynamic_cast(*_sender.commands[0]), - 0, 10); + completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"), + _bucketSpaces.size(), 10); - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size() + 1, _sender.commands.size()); { api::RequestBucketInfoCommand& rbi( - dynamic_cast(*_sender.commands[1])); + dynamic_cast(*_sender.commands[_bucketSpaces.size()])); CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]); } @@ -1286,10 +1328,10 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() // Queue must be cleared once pending state is enabled. { lib::ClusterState state("distributor:1 storage:2"); - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn); } - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); { api::RequestBucketInfoCommand& rbi( dynamic_cast(*_sender.commands[0])); @@ -1527,32 +1569,32 @@ void BucketDBUpdaterTest::testPendingClusterStateSendMessages() { CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("cluster:d", "distributor:1 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1"), + getNodeList({0, 1}), getSentNodes("cluster:d", "distributor:1 storage:3 .2.s:m")); CPPUNIT_ASSERT_EQUAL( - std::string("2"), + getNodeList({2}), getSentNodes("distributor:1 storage:2", "distributor:1 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("2,3,4,5"), + getNodeList({2, 3, 4, 5}), getSentNodes("distributor:1 storage:2", "distributor:1 storage:6")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("distributor:4 storage:3", "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2,3"), + getNodeList({0, 1, 2, 3}), getSentNodes("distributor:4 storage:3", "distributor:4 .2.s:d storage:4")); @@ -1567,17 +1609,17 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:4 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("2"), + getNodeList({2}), getSentNodes("distributor:3 storage:3 .2.s:i", "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:3 storage:3 .1.s:d", "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("1,2,4"), + getNodeList({1, 2, 4}), getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i", "distributor:3 storage:5")); @@ -1602,7 +1644,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 .2.s:m storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("distributor:3 .2.s:m storage:3", "distributor:3 .2.s:d storage:3")); @@ -1612,21 +1654,21 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodesDistributionChanged("distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1"), + getNodeList({0, 1}), getSentNodes("distributor:10 storage:2", "distributor:10 .1.s:d storage:2")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:2 storage:2", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:2 storage:2 .1.s:d", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); @@ -1636,7 +1678,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 .2.s:i storage:2")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("distributor:3 storage:3", "distributor:3 .2.s:s storage:3")); @@ -1646,7 +1688,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 .2.s:d storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:3 storage:3 .1.s:m", "distributor:3 storage:3")); @@ -1672,7 +1714,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); - CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), sender.commands.size()); sortSentMessagesByIndex(sender); @@ -1713,7 +1755,7 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDown() // Entire group 1 goes down. Must refetch from all nodes. CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2,3,4,5"), + getNodeList({0, 1, 2, 3, 4, 5}), getSentNodes("distributor:6 storage:6", "distributor:6 .2.s:d .3.s:d storage:6")); @@ -1733,7 +1775,7 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDownAndNoHandover() // Group is down, but config says to not do anything about it. CPPUNIT_ASSERT_EQUAL( - std::string(""), + getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1), getSentNodes("distributor:6 storage:6", "distributor:6 .2.s:d .3.s:d storage:6")); } @@ -1973,7 +2015,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); { - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); @@ -1988,7 +2030,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() lib::ClusterState stateAfter("distributor:3 storage:3"); { - uint32_t expectedMsgs = 2, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = 2 * _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn); } CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent() @@ -2005,7 +2047,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); { - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); @@ -2042,38 +2084,35 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP { lib::ClusterState stateBefore("distributor:6 storage:6"); { - uint32_t expectedMsgs = 6, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); std::string distConfig(getDistConfig6Nodes3Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); // Suddenly, a wild cluster state change appears! Even though this state // does not in itself imply any bucket changes, it will still overwrite the // pending cluster state and thus its state of pending bucket info requests. setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(12), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(12 * _bucketSpaces.size(), _sender.commands.size()); - // Send replies for first 6 (outdated requests). + // Send replies for first 6 * _bucketSpaces.siz() (outdated requests). int numBuckets = 10; - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 storage:6"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); + for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), + *_sender.commands[i], numBuckets); } // No change from these. assertCorrectBuckets(1, "distributor:6 storage:6"); // Send for current pending. - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 .2.t:12345 storage:6"), - dynamic_cast(*_sender.commands[i + 6]), - i, numBuckets); + for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), + *_sender.commands[i + 6 * _bucketSpaces.size()], + numBuckets); } assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); _sender.clear(); @@ -2086,7 +2125,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP void BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() { - setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6, 20); + setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6 * _bucketSpaces.size(), 20); _sender.clear(); // First cluster state; implicit scan of all buckets which does not // use normal recovery mode ticking-path. @@ -2098,13 +2137,11 @@ BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() // No replies received yet, still no recovery mode. CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); uint32_t numBuckets = 10; - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 storage:6"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); + for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), + *_sender.commands[i], numBuckets); } // Pending cluster state (i.e. distribution) has been enabled, which should @@ -2118,7 +2155,7 @@ BucketDBUpdaterTest::testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp() getClock().setAbsoluteTimeInSeconds(101234); lib::ClusterState stateBefore("distributor:1 storage:1"); { - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } @@ -2134,7 +2171,7 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() { { lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i"); - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 0; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 0; // This step is required to make the distributor ready for accepting // the below explicit database insertion towards node 0. setAndEnableClusterState(stateBefore, expectedMsgs, @@ -2144,7 +2181,7 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() getClock().setAbsoluteTimeInSeconds(1000); lib::ClusterState state("distributor:1 storage:1"); setSystemState(state); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); // Before replying with the bucket info, simulate the arrival of a mutation // reply that alters the state of the bucket with information that will be @@ -2168,11 +2205,9 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() // happening before t=1000 but receiving a reply at t>1000 does not affect // correctness, as this should contain the same bucket info as that // contained in the full bucket reply and the DB update is thus idempotent. - fakeBucketReply( - state, - dynamic_cast(*_sender.commands[0]), - 0, - bucketsReturned); + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { + fakeBucketReply(state, *_sender.commands[i], bucketsReturned); + } BucketDatabase::Entry e(getBucket(bucket)); CPPUNIT_ASSERT_EQUAL(uint32_t(1), e->getNodeCount()); @@ -2231,8 +2266,9 @@ void BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch() { CPPUNIT_ASSERT_EQUAL( - (nodeVec{0, 1, 2, 3, 4, 5}), - getSentNodesWithPreemption("version:1 distributor:6 storage:6", 6, + expandNodeVec({0, 1, 2, 3, 4, 5}), + getSentNodesWithPreemption("version:1 distributor:6 storage:6", + 6 * _bucketSpaces.size(), "version:2 distributor:6 .5.s:d storage:6", "version:3 distributor:6 storage:6")); } @@ -2241,9 +2277,10 @@ void BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch() { CPPUNIT_ASSERT_EQUAL( - (nodeVec{2, 3}), + expandNodeVec({2, 3}), getSentNodesWithPreemption( - "version:1 distributor:6 storage:6 .2.s:d", 5, + "version:1 distributor:6 storage:6 .2.s:d", + 5 * _bucketSpaces.size(), "version:2 distributor:6 storage:6 .2.s:d .3.s:d", "version:3 distributor:6 storage:6")); } @@ -2252,9 +2289,10 @@ void BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched() { CPPUNIT_ASSERT_EQUAL( - (nodeVec{2}), + expandNodeVec({2}), getSentNodesWithPreemption( - "version:1 distributor:6 storage:6", 6, + "version:1 distributor:6 storage:6", + 6 * _bucketSpaces.size(), "version:2 distributor:6 storage:6 .2.s:d", "version:3 distributor:6 storage:6")); } @@ -2265,7 +2303,8 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState() CPPUNIT_ASSERT_EQUAL( nodeVec{}, getSentNodesWithPreemption( - "version:1 distributor:6 storage:6 .2.s:d", 5, + "version:1 distributor:6 storage:6 .2.s:d", + 5 * _bucketSpaces.size(), "version:2 distributor:6 storage:6", // Sends to 2. "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. } @@ -2276,9 +2315,10 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState() // Even though 100 nodes are preempted, not all of these should be part // of the request afterwards when only 6 are part of the state. CPPUNIT_ASSERT_EQUAL( - (nodeVec{0, 1, 2, 3, 4, 5}), + expandNodeVec({0, 1, 2, 3, 4, 5}), getSentNodesWithPreemption( - "version:1 distributor:6 storage:100", 100, + "version:1 distributor:6 storage:100", + 100 * _bucketSpaces.size(), "version:2 distributor:5 .4.s:d storage:100", "version:3 distributor:6 storage:6")); } @@ -2288,7 +2328,7 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion() { lib::ClusterState stateBefore( "version:1 distributor:6 storage:6 .1.t:1234"); - uint32_t expectedMsgs = 6, dummyBucketsToReturn = 10; + uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 10; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); _sender.clear(); // New cluster state that should not by itself trigger any new fetches, @@ -2327,7 +2367,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch() { // Same number of online disks, but the set of disks has changed. CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:2 storage:2 .1.d:3 .1.d.2.s:d", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); } @@ -2343,7 +2383,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch() void BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() { - uint32_t expectedMsgs = 3, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = 3 * _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), expectedMsgs, dummyBucketsToReturn); _sender.clear(); @@ -2371,11 +2411,11 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() // Attempt to apply state with {0, 1} set. This will compare the new state // with the previous state, which still has node 2. - expectedMsgs = 2; + expectedMsgs = 2 * _bucketSpaces.size(); setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), expectedMsgs, dummyBucketsToReturn); - CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1}), getSendSet()); + CPPUNIT_ASSERT_EQUAL(expandNodeVec({0, 1}), getSendSet()); } void @@ -2413,7 +2453,7 @@ BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer() void BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() { - completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2); + completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis()); } @@ -2421,8 +2461,8 @@ BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() void BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes() { - completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2); - completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1); + completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size()); + completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis()); } @@ -2431,13 +2471,13 @@ void BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change() { lib::ClusterState state("distributor:2 storage:2"); - setAndEnableClusterState(state, 2, 1); + setAndEnableClusterState(state, 2 * _bucketSpaces.size(), 1); _sender.clear(); std::string distConfig(getDistConfig3Nodes1Group()); setDistribution(distConfig); getClock().addSecondsToTime(4); - completeBucketInfoGathering(state, 2); + completeBucketInfoGathering(state, 2 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis()); } @@ -2451,7 +2491,7 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions() // Pre-empted with new state here, which will push out the old pending // state and replace it with a new one. We should still count the time // used processing the old state. - completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3); + completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis()); } -- cgit v1.2.3