From ab30a107007f91bd207980b670356b8acc50ad63 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 14 Feb 2018 09:51:22 +0000 Subject: Adjust distributor unit tests to handle global distributor bucket space. --- .../src/tests/distributor/bucketdbupdatertest.cpp | 516 +++++++++++---------- storage/src/tests/distributor/distributortest.cpp | 45 +- .../src/tests/distributor/distributortestutil.cpp | 10 + .../src/tests/distributor/distributortestutil.h | 1 + .../tests/distributor/idealstatemanagertest.cpp | 31 +- 5 files changed, 347 insertions(+), 256 deletions(-) diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index b79894aee9a..2db7be19712 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -13,18 +13,45 @@ #include #include #include +#include #include #include #include #include +#include using namespace storage::api; using namespace storage::lib; using document::test::makeDocumentBucket; using document::test::makeBucketSpace; +using document::BucketSpace; +using document::FixedBucketSpaces; namespace storage::distributor { +namespace { + +std::string +getStringList(std::string s, uint32_t count) +{ + std::ostringstream ost; + for (uint32_t i = 0; i < count; ++i) { + if (i > 0) { + ost << ","; + } + ost << s; + } + return ost.str(); +} + +std::string +getRequestBucketInfoStrings(uint32_t count) +{ + return getStringList("Request bucket info", count); +} + +} + class BucketDBUpdaterTest : public CppUnit::TestFixture, public DistributorTestUtil { @@ -87,6 +114,9 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); CPPUNIT_TEST_SUITE_END(); +public: + BucketDBUpdaterTest(); + protected: void testNormalUsage(); void testDistributorChange(); @@ -161,10 +191,20 @@ protected: return clusterInfo; } + static std::string getNodeList(std::vector nodes, size_t count); + + std::string getNodeList(std::vector nodes); + + std::vector + expandNodeVec(const std::vector &nodes); + + std::vector _bucketSpaces; + public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { createLinks(); + _bucketSpaces = getBucketSpaces(); }; void tearDown() override { @@ -173,17 +213,17 @@ public: std::shared_ptr getFakeBucketReply( const lib::ClusterState& state, - RequestBucketInfoCommand& cmd, + const RequestBucketInfoCommand& cmd, int storageIndex, - int bucketCount, - int invalidBucketCount = 0) + uint32_t bucketCount, + uint32_t invalidBucketCount = 0) { RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd); sreply->setAddress(storageAddress(storageIndex)); api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); - for (int i=0; i(sreply); } - void fakeBucketReply( - const lib::ClusterState& state, - RequestBucketInfoCommand& cmd, - int storageIndex, - int bucketCount, - int invalidBucketCount = 0) + void fakeBucketReply(const lib::ClusterState &state, + const api::StorageCommand &cmd, + uint32_t bucketCount, + uint32_t invalidBucketCount = 0) { + CPPUNIT_ASSERT(cmd.getType() == MessageType::REQUESTBUCKETINFO); + const api::StorageMessageAddress &address(*cmd.getAddress()); getBucketDBUpdater().onRequestBucketInfoReply( getFakeBucketReply(state, - cmd, - storageIndex, + dynamic_cast(cmd), + address.getIndex(), bucketCount, invalidBucketCount)); } @@ -332,22 +372,15 @@ public: } void completeBucketInfoGathering(const lib::ClusterState& state, - uint32_t expectedMsgs, - uint32_t nBuckets = 1) + size_t expectedMsgs, + uint32_t bucketCount = 1, + uint32_t invalidBucketCount = 0) { - CPPUNIT_ASSERT_EQUAL(size_t(expectedMsgs), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(expectedMsgs, _sender.commands.size()); for (uint32_t i = 0; i < _sender.commands.size(); i++) { - CPPUNIT_ASSERT(_sender.commands[i]->getType() == - MessageType::REQUESTBUCKETINFO); - - const api::StorageMessageAddress& address( - *_sender.commands[i]->getAddress()); - fakeBucketReply( - state, - dynamic_cast(*_sender.commands[i]), - address.getIndex(), - nBuckets); + fakeBucketReply(state, *_sender.commands[i], + bucketCount, invalidBucketCount); } } @@ -383,12 +416,12 @@ public: setSystemState(newState); - for (uint32_t i=0; igetType() == MessageType::REQUESTBUCKETINFO); const api::StorageMessageAddress *address = _sender.commands[i]->getAddress(); - CPPUNIT_ASSERT_EQUAL(i, (uint32_t)address->getIndex()); + CPPUNIT_ASSERT_EQUAL((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex()); } } @@ -401,11 +434,8 @@ public: "distributor:1 storage:%d", numStorageNodes)); lib::ClusterState newState(state); - for (uint32_t i=0; i( *_sender.commands[0]).getDistributionHash()); - fakeBucketReply( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - dynamic_cast(*_sender.commands[0]), - 0, 10); + fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + *_sender.commands[0], 10); _sender.clear(); @@ -605,17 +640,12 @@ BucketDBUpdaterTest::testNormalUsage() // change is only implemented after completion of previous cluster state setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); // Expect reply of first set SystemState request. CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); - for (uint32_t i = 0; i < 3; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - dynamic_cast(*_sender.commands[i]), - i, 10); - } - + completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + 3 * _bucketSpaces.size(), 10); assertCorrectBuckets(10, "distributor:2 storage:3"); } @@ -626,13 +656,9 @@ BucketDBUpdaterTest::testDistributorChange() // First sends request setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); - for (uint32_t i = 0; i < 3; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); - } + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + 3 * _bucketSpaces.size(), numBuckets); _sender.clear(); // No change from initializing to up (when done with last job) @@ -648,13 +674,9 @@ BucketDBUpdaterTest::testDistributorChange() // Removing distributor. Need to refetch new data from all nodes. setSystemState(lib::ClusterState("distributor:2 storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); - for (uint32_t i = 0; i < 3; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:2 storage:3"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); - } + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size()); + completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"), + 3 * _bucketSpaces.size(), numBuckets); _sender.clear(); assertCorrectBuckets(numBuckets, "distributor:2 storage:3"); } @@ -667,13 +689,9 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping() int numBuckets = 100; setSystemState(lib::ClusterState("distributor:6 storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 storage:6"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); - } + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); + completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"), + 6 * _bucketSpaces.size(), numBuckets); _sender.clear(); // Distributor going down in other group, no change @@ -693,7 +711,7 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping() // Changed grouping cause change setDistribution(getDistConfig6Nodes4Groups()); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); } void @@ -701,16 +719,13 @@ BucketDBUpdaterTest::testNormalUsageInitializing() { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i")); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); // Not yet passing on system state. CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 10, - 10); + completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucketSpaces.size(), 10, 10); assertCorrectBuckets(10, "distributor:1 storage:1"); @@ -727,12 +742,10 @@ BucketDBUpdaterTest::testNormalUsageInitializing() setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); // Send a new request bucket info up. - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 20); + completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucketSpaces.size(), 20); // Pass on cluster state and recheck buckets now. CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); @@ -746,33 +759,34 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo() setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); { - std::shared_ptr reply = - getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 10); + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { + std::shared_ptr reply = + getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *((RequestBucketInfoCommand*)_sender.commands[i].get()), + 0, + 10); + reply->setResult(api::ReturnCode::NOT_CONNECTED); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + } - reply->setResult(api::ReturnCode::NOT_CONNECTED); - getBucketDBUpdater().onRequestBucketInfoReply(reply); - // Trigger that delayed message is sent + // Trigger that delayed message is sent getClock().addSecondsToTime(10); getBucketDBUpdater().resendDelayedMessages(); } // Should be resent. - CPPUNIT_ASSERT_EQUAL(std::string("Request bucket info," - "Request bucket info"), + CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(2 * _bucketSpaces.size()), _sender.getCommands()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[1].get()), - 0, - 10); + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *_sender.commands[_bucketSpaces.size() + i], 10); + } for (int i=0; i<10; i++) { CPPUNIT_ASSERT_EQUAL( @@ -791,14 +805,14 @@ BucketDBUpdaterTest::testEncodeErrorHandling() { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); // Not yet passing on system state. CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); - { + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { std::shared_ptr reply = getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), + *((RequestBucketInfoCommand*)_sender.commands[i].get()), 0, 10); @@ -815,21 +829,15 @@ BucketDBUpdaterTest::testDownWhileInit() setStorageNodes(3); fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *((RequestBucketInfoCommand*)_sender.commands[0].get()), - 0, - 5); + *_sender.commands[0], 5); setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *((RequestBucketInfoCommand*)_sender.commands[2].get()), - 2, - 5); + *_sender.commands[2], 5); fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *((RequestBucketInfoCommand*)_sender.commands[1].get()), - 1, - 5); + *_sender.commands[1], 5); } bool @@ -844,6 +852,42 @@ BucketDBUpdaterTest::bucketExistsThatHasNode(int bucketCount, uint16_t node) con return false; } +std::string +BucketDBUpdaterTest::getNodeList(std::vector nodes, size_t count) +{ + std::ostringstream ost; + bool first = true; + for (const auto &node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + if (!first) { + ost << ","; + } + ost << node; + first = false; + } + } + return ost.str(); +} + +std::string +BucketDBUpdaterTest::getNodeList(std::vector nodes) +{ + return getNodeList(std::move(nodes), _bucketSpaces.size()); +} + +std::vector +BucketDBUpdaterTest::expandNodeVec(const std::vector &nodes) +{ + std::vector res; + size_t count = _bucketSpaces.size(); + for (const auto &node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + res.push_back(node); + } + } + return res; +} + void BucketDBUpdaterTest::testNodeDown() { @@ -903,16 +947,13 @@ BucketDBUpdaterTest::testInitializingWhileRecheck() lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); setSystemState(systemState); - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(2 * _bucketSpaces.size(), _sender.commands.size()); CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - for (int i=0; i<2; i++) { - fakeBucketReply(systemState, - *((RequestBucketInfoCommand*)_sender.commands[i].get()), - i, - 100); + for (uint32_t i = 0; i < 2 * _bucketSpaces.size(); ++i) { + fakeBucketReply(systemState, *_sender.commands[i], 100); } // Now we can pass on system state. @@ -931,35 +972,36 @@ BucketDBUpdaterTest::testBitChange() { setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2")); - CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size()); - - CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO); - - RequestBucketInfoReply* sreply = - new RequestBucketInfoReply(*((RequestBucketInfoCommand*)_sender.commands[0].get())); - sreply->setAddress(storageAddress(0)); - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); - + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + const auto &req = dynamic_cast(*_sender.commands[bsi]); + RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req); + sreply->setAddress(storageAddress(0)); + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + int cnt=0; + for (int i=0; cnt < 2; i++) { + lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); + std::vector distributors; + if (distribution.getIdealDistributorNode( + lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), + document::BucketId(16, i)) + == 0) + { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); - int cnt=0; - for (int i=0; cnt < 2; i++) { - lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); - std::vector distributors; - if (distribution.getIdealDistributorNode( - lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), - document::BucketId(16, i)) - == 0) - { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); - - bucketlist.push_back(document::BucketId(16, i)); - cnt++; + bucketlist.push_back(document::BucketId(16, i)); + cnt++; + } + } } - } - getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr(sreply)); + getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr(sreply)); + } } CPPUNIT_ASSERT_EQUAL( @@ -975,29 +1017,31 @@ BucketDBUpdaterTest::testBitChange() _sender.clear(); setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2")); - CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size()); - - CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + + CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + const auto &req = dynamic_cast(*_sender.commands[bsi]); + RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req); + sreply->setAddress(storageAddress(0)); + sreply->setResult(api::ReturnCode::OK); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + + for (uint32_t i = 0; i < 3; ++i) { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + } - RequestBucketInfoReply* sreply = - new RequestBucketInfoReply( - *((RequestBucketInfoCommand*)_sender.commands[0].get())); - sreply->setAddress(storageAddress(0)); - sreply->setResult(api::ReturnCode::OK); - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, 4), + api::BucketInfo(10,1,1))); + } - for (uint32_t i = 0; i < 3; ++i) { - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); + getBucketDBUpdater().onRequestBucketInfoReply( + std::shared_ptr(sreply)); } - - vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, 4), - api::BucketInfo(10,1,1))); - - getBucketDBUpdater().onRequestBucketInfoReply( - std::shared_ptr(sreply)); } CPPUNIT_ASSERT_EQUAL( @@ -1256,7 +1300,7 @@ void BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() { setSystemState(lib::ClusterState("distributor:1 storage:1")); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); { api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); @@ -1266,18 +1310,16 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() getBucketDBUpdater().onNotifyBucketChange(cmd); } - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); - fakeBucketReply( - lib::ClusterState("distributor:1 storage:1"), - dynamic_cast(*_sender.commands[0]), - 0, 10); + completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"), + _bucketSpaces.size(), 10); - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size() + 1, _sender.commands.size()); { api::RequestBucketInfoCommand& rbi( - dynamic_cast(*_sender.commands[1])); + dynamic_cast(*_sender.commands[_bucketSpaces.size()])); CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]); } @@ -1286,10 +1328,10 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() // Queue must be cleared once pending state is enabled. { lib::ClusterState state("distributor:1 storage:2"); - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn); } - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); { api::RequestBucketInfoCommand& rbi( dynamic_cast(*_sender.commands[0])); @@ -1527,32 +1569,32 @@ void BucketDBUpdaterTest::testPendingClusterStateSendMessages() { CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("cluster:d", "distributor:1 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1"), + getNodeList({0, 1}), getSentNodes("cluster:d", "distributor:1 storage:3 .2.s:m")); CPPUNIT_ASSERT_EQUAL( - std::string("2"), + getNodeList({2}), getSentNodes("distributor:1 storage:2", "distributor:1 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("2,3,4,5"), + getNodeList({2, 3, 4, 5}), getSentNodes("distributor:1 storage:2", "distributor:1 storage:6")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("distributor:4 storage:3", "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2,3"), + getNodeList({0, 1, 2, 3}), getSentNodes("distributor:4 storage:3", "distributor:4 .2.s:d storage:4")); @@ -1567,17 +1609,17 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:4 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("2"), + getNodeList({2}), getSentNodes("distributor:3 storage:3 .2.s:i", "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:3 storage:3 .1.s:d", "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("1,2,4"), + getNodeList({1, 2, 4}), getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i", "distributor:3 storage:5")); @@ -1602,7 +1644,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 .2.s:m storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("distributor:3 .2.s:m storage:3", "distributor:3 .2.s:d storage:3")); @@ -1612,21 +1654,21 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodesDistributionChanged("distributor:3 storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1"), + getNodeList({0, 1}), getSentNodes("distributor:10 storage:2", "distributor:10 .1.s:d storage:2")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:2 storage:2", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:2 storage:2 .1.s:d", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); @@ -1636,7 +1678,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 .2.s:i storage:2")); CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2"), + getNodeList({0, 1, 2}), getSentNodes("distributor:3 storage:3", "distributor:3 .2.s:s storage:3")); @@ -1646,7 +1688,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages() "distributor:3 .2.s:d storage:3")); CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:3 storage:3 .1.s:m", "distributor:3 storage:3")); @@ -1672,7 +1714,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); - CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), sender.commands.size()); sortSentMessagesByIndex(sender); @@ -1713,7 +1755,7 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDown() // Entire group 1 goes down. Must refetch from all nodes. CPPUNIT_ASSERT_EQUAL( - std::string("0,1,2,3,4,5"), + getNodeList({0, 1, 2, 3, 4, 5}), getSentNodes("distributor:6 storage:6", "distributor:6 .2.s:d .3.s:d storage:6")); @@ -1733,7 +1775,7 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDownAndNoHandover() // Group is down, but config says to not do anything about it. CPPUNIT_ASSERT_EQUAL( - std::string(""), + getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1), getSentNodes("distributor:6 storage:6", "distributor:6 .2.s:d .3.s:d storage:6")); } @@ -1973,7 +2015,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); { - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); @@ -1988,7 +2030,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() lib::ClusterState stateAfter("distributor:3 storage:3"); { - uint32_t expectedMsgs = 2, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = 2 * _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn); } CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent() @@ -2005,7 +2047,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); { - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); @@ -2042,38 +2084,35 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP { lib::ClusterState stateBefore("distributor:6 storage:6"); { - uint32_t expectedMsgs = 6, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } _sender.clear(); std::string distConfig(getDistConfig6Nodes3Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); // Suddenly, a wild cluster state change appears! Even though this state // does not in itself imply any bucket changes, it will still overwrite the // pending cluster state and thus its state of pending bucket info requests. setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(12), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(12 * _bucketSpaces.size(), _sender.commands.size()); - // Send replies for first 6 (outdated requests). + // Send replies for first 6 * _bucketSpaces.siz() (outdated requests). int numBuckets = 10; - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 storage:6"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); + for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), + *_sender.commands[i], numBuckets); } // No change from these. assertCorrectBuckets(1, "distributor:6 storage:6"); // Send for current pending. - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 .2.t:12345 storage:6"), - dynamic_cast(*_sender.commands[i + 6]), - i, numBuckets); + for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), + *_sender.commands[i + 6 * _bucketSpaces.size()], + numBuckets); } assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); _sender.clear(); @@ -2086,7 +2125,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP void BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() { - setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6, 20); + setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6 * _bucketSpaces.size(), 20); _sender.clear(); // First cluster state; implicit scan of all buckets which does not // use normal recovery mode ticking-path. @@ -2098,13 +2137,11 @@ BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() // No replies received yet, still no recovery mode. CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); - CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size()); uint32_t numBuckets = 10; - for (uint32_t i = 0; i < 6; ++i) { - fakeBucketReply( - lib::ClusterState("distributor:6 storage:6"), - dynamic_cast(*_sender.commands[i]), - i, numBuckets); + for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { + fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), + *_sender.commands[i], numBuckets); } // Pending cluster state (i.e. distribution) has been enabled, which should @@ -2118,7 +2155,7 @@ BucketDBUpdaterTest::testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp() getClock().setAbsoluteTimeInSeconds(101234); lib::ClusterState stateBefore("distributor:1 storage:1"); { - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); } @@ -2134,7 +2171,7 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() { { lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i"); - uint32_t expectedMsgs = 1, dummyBucketsToReturn = 0; + uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 0; // This step is required to make the distributor ready for accepting // the below explicit database insertion towards node 0. setAndEnableClusterState(stateBefore, expectedMsgs, @@ -2144,7 +2181,7 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() getClock().setAbsoluteTimeInSeconds(1000); lib::ClusterState state("distributor:1 storage:1"); setSystemState(state); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); // Before replying with the bucket info, simulate the arrival of a mutation // reply that alters the state of the bucket with information that will be @@ -2168,11 +2205,9 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() // happening before t=1000 but receiving a reply at t>1000 does not affect // correctness, as this should contain the same bucket info as that // contained in the full bucket reply and the DB update is thus idempotent. - fakeBucketReply( - state, - dynamic_cast(*_sender.commands[0]), - 0, - bucketsReturned); + for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { + fakeBucketReply(state, *_sender.commands[i], bucketsReturned); + } BucketDatabase::Entry e(getBucket(bucket)); CPPUNIT_ASSERT_EQUAL(uint32_t(1), e->getNodeCount()); @@ -2231,8 +2266,9 @@ void BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch() { CPPUNIT_ASSERT_EQUAL( - (nodeVec{0, 1, 2, 3, 4, 5}), - getSentNodesWithPreemption("version:1 distributor:6 storage:6", 6, + expandNodeVec({0, 1, 2, 3, 4, 5}), + getSentNodesWithPreemption("version:1 distributor:6 storage:6", + 6 * _bucketSpaces.size(), "version:2 distributor:6 .5.s:d storage:6", "version:3 distributor:6 storage:6")); } @@ -2241,9 +2277,10 @@ void BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch() { CPPUNIT_ASSERT_EQUAL( - (nodeVec{2, 3}), + expandNodeVec({2, 3}), getSentNodesWithPreemption( - "version:1 distributor:6 storage:6 .2.s:d", 5, + "version:1 distributor:6 storage:6 .2.s:d", + 5 * _bucketSpaces.size(), "version:2 distributor:6 storage:6 .2.s:d .3.s:d", "version:3 distributor:6 storage:6")); } @@ -2252,9 +2289,10 @@ void BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched() { CPPUNIT_ASSERT_EQUAL( - (nodeVec{2}), + expandNodeVec({2}), getSentNodesWithPreemption( - "version:1 distributor:6 storage:6", 6, + "version:1 distributor:6 storage:6", + 6 * _bucketSpaces.size(), "version:2 distributor:6 storage:6 .2.s:d", "version:3 distributor:6 storage:6")); } @@ -2265,7 +2303,8 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState() CPPUNIT_ASSERT_EQUAL( nodeVec{}, getSentNodesWithPreemption( - "version:1 distributor:6 storage:6 .2.s:d", 5, + "version:1 distributor:6 storage:6 .2.s:d", + 5 * _bucketSpaces.size(), "version:2 distributor:6 storage:6", // Sends to 2. "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. } @@ -2276,9 +2315,10 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState() // Even though 100 nodes are preempted, not all of these should be part // of the request afterwards when only 6 are part of the state. CPPUNIT_ASSERT_EQUAL( - (nodeVec{0, 1, 2, 3, 4, 5}), + expandNodeVec({0, 1, 2, 3, 4, 5}), getSentNodesWithPreemption( - "version:1 distributor:6 storage:100", 100, + "version:1 distributor:6 storage:100", + 100 * _bucketSpaces.size(), "version:2 distributor:5 .4.s:d storage:100", "version:3 distributor:6 storage:6")); } @@ -2288,7 +2328,7 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion() { lib::ClusterState stateBefore( "version:1 distributor:6 storage:6 .1.t:1234"); - uint32_t expectedMsgs = 6, dummyBucketsToReturn = 10; + uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 10; setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); _sender.clear(); // New cluster state that should not by itself trigger any new fetches, @@ -2327,7 +2367,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch() { // Same number of online disks, but the set of disks has changed. CPPUNIT_ASSERT_EQUAL( - std::string("1"), + getNodeList({1}), getSentNodes("distributor:2 storage:2 .1.d:3 .1.d.2.s:d", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); } @@ -2343,7 +2383,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch() void BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() { - uint32_t expectedMsgs = 3, dummyBucketsToReturn = 1; + uint32_t expectedMsgs = 3 * _bucketSpaces.size(), dummyBucketsToReturn = 1; setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), expectedMsgs, dummyBucketsToReturn); _sender.clear(); @@ -2371,11 +2411,11 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() // Attempt to apply state with {0, 1} set. This will compare the new state // with the previous state, which still has node 2. - expectedMsgs = 2; + expectedMsgs = 2 * _bucketSpaces.size(); setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), expectedMsgs, dummyBucketsToReturn); - CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1}), getSendSet()); + CPPUNIT_ASSERT_EQUAL(expandNodeVec({0, 1}), getSendSet()); } void @@ -2413,7 +2453,7 @@ BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer() void BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() { - completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2); + completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis()); } @@ -2421,8 +2461,8 @@ BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() void BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes() { - completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2); - completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1); + completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size()); + completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis()); } @@ -2431,13 +2471,13 @@ void BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change() { lib::ClusterState state("distributor:2 storage:2"); - setAndEnableClusterState(state, 2, 1); + setAndEnableClusterState(state, 2 * _bucketSpaces.size(), 1); _sender.clear(); std::string distConfig(getDistConfig3Nodes1Group()); setDistribution(distConfig); getClock().addSecondsToTime(4); - completeBucketInfoGathering(state, 2); + completeBucketInfoGathering(state, 2 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis()); } @@ -2451,7 +2491,7 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions() // Pre-empted with new state here, which will push out the old pending // state and replace it with a new one. We should still count the time // used processing the old state. - completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3); + completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3 * _bucketSpaces.size()); CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis()); } diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 1640af0f871..d585c4d0d32 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,7 @@ using document::test::makeDocumentBucket; using document::test::makeBucketSpace; +using document::FixedBucketSpaces; namespace storage { @@ -58,6 +60,9 @@ class Distributor_Test : public CppUnit::TestFixture, CPPUNIT_TEST(closing_aborts_priority_queued_client_requests); CPPUNIT_TEST_SUITE_END(); +public: + Distributor_Test(); + protected: void testOperationGeneration(); void testOperationsGeneratedAndStartedWithoutDuplicates(); @@ -89,9 +94,12 @@ protected: void internal_messages_are_started_in_fifo_order_batch(); void closing_aborts_priority_queued_client_requests(); + std::vector _bucketSpaces; + public: void setUp() override { createLinks(); + _bucketSpaces = getBucketSpaces(); }; void tearDown() override { @@ -197,6 +205,13 @@ private: CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test); +Distributor_Test::Distributor_Test() + : CppUnit::TestFixture(), + DistributorTestUtil(), + _bucketSpaces() +{ +} + void Distributor_Test::testOperationGeneration() { @@ -752,19 +767,23 @@ void Distributor_Test::sendDownClusterStateCommand() { } void Distributor_Test::replyToSingleRequestBucketInfoCommandWith1Bucket() { - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); - CPPUNIT_ASSERT_EQUAL(api::MessageType::REQUESTBUCKETINFO, - _sender.commands[0]->getType()); - auto& bucketReq(static_cast( - *_sender.commands[0])); - auto bucketReply = bucketReq.makeReply(); - // Make sure we have a bucket to route our remove op to, or we'd get - // an immediate reply anyway. - dynamic_cast(*bucketReply) - .getBucketInfo().push_back( - api::RequestBucketInfoReply::Entry(document::BucketId(1, 1), - api::BucketInfo(20, 10, 12, 50, 60, true, true))); - _distributor->handleMessage(std::move(bucketReply)); + CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + for (uint32_t i = 0; i < _sender.commands.size(); ++i) { + CPPUNIT_ASSERT_EQUAL(api::MessageType::REQUESTBUCKETINFO, + _sender.commands[i]->getType()); + auto& bucketReq(static_cast + (*_sender.commands[i])); + auto bucketReply = bucketReq.makeReply(); + if (bucketReq.getBucketSpace() == FixedBucketSpaces::default_space()) { + // Make sure we have a bucket to route our remove op to, or we'd get + // an immediate reply anyway. + dynamic_cast(*bucketReply) + .getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(document::BucketId(1, 1), + api::BucketInfo(20, 10, 12, 50, 60, true, true))); + } + _distributor->handleMessage(std::move(bucketReply)); + } _sender.commands.clear(); } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index fedd836513b..8aa9ffadebe 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -376,4 +376,14 @@ DistributorTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); } +std::vector +DistributorTestUtil::getBucketSpaces() const +{ + std::vector res; + for (const auto &repo : getBucketSpaceRepo()) { + res.push_back(repo.first); + } + return res; +} + } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 19da0483165..45aaf6b1dc7 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -166,6 +166,7 @@ public: BucketDatabase::Entry getBucket(const document::BucketId& bId) const; + std::vector getBucketSpaces() const; protected: vdstestlib::DirConfig _config; std::unique_ptr _node; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 945ccfa1484..7103a89229d 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -9,11 +9,13 @@ #include #include #include +#include #include #include using document::test::makeDocumentBucket; using document::test::makeBucketSpace; +using document::FixedBucketSpaces; namespace storage { namespace distributor { @@ -22,9 +24,14 @@ class IdealStateManagerTest : public CppUnit::TestFixture, public DistributorTestUtil { public: - IdealStateManagerTest() {} + IdealStateManagerTest() + : CppUnit::TestFixture(), + DistributorTestUtil(), + _bucketSpaces() + {} void setUp() override { createLinks(); + _bucketSpaces = getBucketSpaces(); }; void tearDown() override { @@ -54,6 +61,9 @@ public: CPPUNIT_TEST(testBlockIdealStateOpsOnFullRequestBucketInfo); CPPUNIT_TEST(testBlockCheckForAllOperationsToSpecificBucket); CPPUNIT_TEST_SUITE_END(); +private: + std::vector _bucketSpaces; + std::string makeBucketStatusString(const std::string &defaultSpaceBucketStatus); }; CPPUNIT_TEST_SUITE_REGISTRATION(IdealStateManagerTest); @@ -91,8 +101,7 @@ IdealStateManagerTest::testStatusPage() { std::ostringstream ost; getIdealStateManager().getBucketStatus(ost); - CPPUNIT_ASSERT_EQUAL(std::string("

default - BucketSpace(0x0000000000000001)

\n" - "BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]
\n" + CPPUNIT_ASSERT_EQUAL(makeBucketStatusString("BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]
\n" "BucketId(0x4000000000000005): : split: [Splitting bucket because its maximum size (200 b, 100 docs, 100 meta, 200 b total) is " "higher than the configured limit of (100, 1000000)] [node(idx=0,crc=0xff,docs=100/100,bytes=200/200,trusted=true," "active=true,ready=false)]
\n"), @@ -113,8 +122,7 @@ IdealStateManagerTest::testDisabledStateChecker() { std::ostringstream ost; getIdealStateManager().getBucketStatus(ost); - CPPUNIT_ASSERT_EQUAL(std::string( - "

default - BucketSpace(0x0000000000000001)

\n" + CPPUNIT_ASSERT_EQUAL(makeBucketStatusString( "BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]
\n" "BucketId(0x4000000000000005): : split: [Splitting bucket because its maximum size (200 b, 100 docs, 100 meta, 200 b total) is " "higher than the configured limit of (100, 1000000)] [node(idx=0,crc=0xff,docs=100/100,bytes=200/200,trusted=true," @@ -261,6 +269,19 @@ IdealStateManagerTest::testBlockCheckForAllOperationsToSpecificBucket() } } +std::string +IdealStateManagerTest::makeBucketStatusString(const std::string &defaultSpaceBucketStatus) +{ + std::ostringstream ost; + for (const auto &bucketSpace : _bucketSpaces) { + ost << "

" << FixedBucketSpaces::to_string(bucketSpace) << " - " << bucketSpace << "

\n"; + if (bucketSpace == FixedBucketSpaces::default_space()) { + ost << defaultSpaceBucketStatus; + } + } + return ost.str(); +} + } // distributor } // storage -- cgit v1.2.3 From d26d2cf6779191921bd070a3709e45d4acc25640 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 14 Feb 2018 14:09:23 +0000 Subject: Repair auto-indent damage caused by confused text editor. --- storage/src/tests/distributor/bucketdbupdatertest.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 2db7be19712..bdbc3aed01c 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -986,13 +986,13 @@ BucketDBUpdaterTest::testBitChange() lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); std::vector distributors; if (distribution.getIdealDistributorNode( - lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), - document::BucketId(16, i)) + lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), + document::BucketId(16, i)) == 0) { vec.push_back(api::RequestBucketInfoReply::Entry( - document::BucketId(16, i), - api::BucketInfo(10,1,1))); + document::BucketId(16, i), + api::BucketInfo(10,1,1))); bucketlist.push_back(document::BucketId(16, i)); cnt++; -- cgit v1.2.3 From 9c5bb6a1a6920ef62a2e26a80b7848c9c9bae8d5 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 14 Feb 2018 14:10:56 +0000 Subject: Fix comment. --- storage/src/tests/distributor/bucketdbupdatertest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index bdbc3aed01c..92495b37494 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -2099,7 +2099,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP CPPUNIT_ASSERT_EQUAL(12 * _bucketSpaces.size(), _sender.commands.size()); - // Send replies for first 6 * _bucketSpaces.siz() (outdated requests). + // Send replies for first 6 * _bucketSpaces.size() (outdated requests). int numBuckets = 10; for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) { fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), -- cgit v1.2.3