aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-02-14 15:29:29 +0100
committerGitHub <noreply@github.com>2018-02-14 15:29:29 +0100
commiteadf1968da3755ae6318c5624b454d329ccf745b (patch)
treee253f16322af024d94a1995b3e8f9b931b2504e0
parent22963ff602da9338422feb2f783ebd9abc0b30d1 (diff)
parent9c5bb6a1a6920ef62a2e26a80b7848c9c9bae8d5 (diff)
Merge pull request #5035 from vespa-engine/toregge/adjust-distributor-unit-tests-for-multiple-bucket-spaces
Adjust distributor unit tests to handle global distributor bucket space.
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp516
-rw-r--r--storage/src/tests/distributor/distributortest.cpp45
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp10
-rw-r--r--storage/src/tests/distributor/distributortestutil.h1
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp31
5 files changed, 347 insertions, 256 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index b79894aee9a..92495b37494 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -13,18 +13,45 @@
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/distributor/simpleclusterinformation.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <sstream>
using namespace storage::api;
using namespace storage::lib;
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
+using document::BucketSpace;
+using document::FixedBucketSpaces;
namespace storage::distributor {
+namespace {
+
+std::string
+getStringList(std::string s, uint32_t count)
+{
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < count; ++i) {
+ if (i > 0) {
+ ost << ",";
+ }
+ ost << s;
+ }
+ return ost.str();
+}
+
+std::string
+getRequestBucketInfoStrings(uint32_t count)
+{
+ return getStringList("Request bucket info", count);
+}
+
+}
+
class BucketDBUpdaterTest : public CppUnit::TestFixture,
public DistributorTestUtil
{
@@ -87,6 +114,9 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture,
CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted);
CPPUNIT_TEST_SUITE_END();
+public:
+ BucketDBUpdaterTest();
+
protected:
void testNormalUsage();
void testDistributorChange();
@@ -161,10 +191,20 @@ protected:
return clusterInfo;
}
+ static std::string getNodeList(std::vector<uint16_t> nodes, size_t count);
+
+ std::string getNodeList(std::vector<uint16_t> nodes);
+
+ std::vector<uint16_t>
+ expandNodeVec(const std::vector<uint16_t> &nodes);
+
+ std::vector<document::BucketSpace> _bucketSpaces;
+
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
void setUp() override {
createLinks();
+ _bucketSpaces = getBucketSpaces();
};
void tearDown() override {
@@ -173,17 +213,17 @@ public:
std::shared_ptr<RequestBucketInfoReply> getFakeBucketReply(
const lib::ClusterState& state,
- RequestBucketInfoCommand& cmd,
+ const RequestBucketInfoCommand& cmd,
int storageIndex,
- int bucketCount,
- int invalidBucketCount = 0)
+ uint32_t bucketCount,
+ uint32_t invalidBucketCount = 0)
{
RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd);
sreply->setAddress(storageAddress(storageIndex));
api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
- for (int i=0; i<bucketCount + invalidBucketCount; i++) {
+ for (uint32_t i=0; i<bucketCount + invalidBucketCount; i++) {
if (!getBucketDBUpdater().getDistributorComponent()
.ownsBucketInState(state, makeDocumentBucket(document::BucketId(16, i)))) {
continue;
@@ -214,17 +254,17 @@ public:
return std::shared_ptr<api::RequestBucketInfoReply>(sreply);
}
- void fakeBucketReply(
- const lib::ClusterState& state,
- RequestBucketInfoCommand& cmd,
- int storageIndex,
- int bucketCount,
- int invalidBucketCount = 0)
+ void fakeBucketReply(const lib::ClusterState &state,
+ const api::StorageCommand &cmd,
+ uint32_t bucketCount,
+ uint32_t invalidBucketCount = 0)
{
+ CPPUNIT_ASSERT(cmd.getType() == MessageType::REQUESTBUCKETINFO);
+ const api::StorageMessageAddress &address(*cmd.getAddress());
getBucketDBUpdater().onRequestBucketInfoReply(
getFakeBucketReply(state,
- cmd,
- storageIndex,
+ dynamic_cast<const RequestBucketInfoCommand &>(cmd),
+ address.getIndex(),
bucketCount,
invalidBucketCount));
}
@@ -332,22 +372,15 @@ public:
}
void completeBucketInfoGathering(const lib::ClusterState& state,
- uint32_t expectedMsgs,
- uint32_t nBuckets = 1)
+ size_t expectedMsgs,
+ uint32_t bucketCount = 1,
+ uint32_t invalidBucketCount = 0)
{
- CPPUNIT_ASSERT_EQUAL(size_t(expectedMsgs), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(expectedMsgs, _sender.commands.size());
for (uint32_t i = 0; i < _sender.commands.size(); i++) {
- CPPUNIT_ASSERT(_sender.commands[i]->getType() ==
- MessageType::REQUESTBUCKETINFO);
-
- const api::StorageMessageAddress& address(
- *_sender.commands[i]->getAddress());
- fakeBucketReply(
- state,
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- address.getIndex(),
- nBuckets);
+ fakeBucketReply(state, *_sender.commands[i],
+ bucketCount, invalidBucketCount);
}
}
@@ -383,12 +416,12 @@ public:
setSystemState(newState);
- for (uint32_t i=0; i<numStorageNodes; i++) {
+ for (uint32_t i=0; i<numStorageNodes * _bucketSpaces.size(); i++) {
CPPUNIT_ASSERT(_sender.commands[i]->getType() ==
MessageType::REQUESTBUCKETINFO);
const api::StorageMessageAddress *address = _sender.commands[i]->getAddress();
- CPPUNIT_ASSERT_EQUAL(i, (uint32_t)address->getIndex());
+ CPPUNIT_ASSERT_EQUAL((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex());
}
}
@@ -401,11 +434,8 @@ public:
"distributor:1 storage:%d", numStorageNodes));
lib::ClusterState newState(state);
- for (uint32_t i=0; i<numStorageNodes; i++) {
- fakeBucketReply(newState,
- *((RequestBucketInfoCommand*)_sender.commands[i].get()),
- i,
- numBuckets);
+ for (uint32_t i=0; i<numStorageNodes * _bucketSpaces.size(); i++) {
+ fakeBucketReply(newState, *_sender.commands[i], numBuckets);
}
assertCorrectBuckets(numBuckets, state);
}
@@ -580,12 +610,19 @@ public:
CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest);
+BucketDBUpdaterTest::BucketDBUpdaterTest()
+ : CppUnit::TestFixture(),
+ DistributorTestUtil(),
+ _bucketSpaces()
+{
+}
+
void
BucketDBUpdaterTest::testNormalUsage()
{
setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
// Ensure distribution hash is set correctly
CPPUNIT_ASSERT_EQUAL(
@@ -594,10 +631,8 @@ BucketDBUpdaterTest::testNormalUsage()
dynamic_cast<const RequestBucketInfoCommand&>(
*_sender.commands[0]).getDistributionHash());
- fakeBucketReply(
- lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]),
- 0, 10);
+ fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
+ *_sender.commands[0], 10);
_sender.clear();
@@ -605,17 +640,12 @@ BucketDBUpdaterTest::testNormalUsage()
// change is only implemented after completion of previous cluster state
setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
// Expect reply of first set SystemState request.
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
- for (uint32_t i = 0; i < 3; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- i, 10);
- }
-
+ completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
+ 3 * _bucketSpaces.size(), 10);
assertCorrectBuckets(10, "distributor:2 storage:3");
}
@@ -626,13 +656,9 @@ BucketDBUpdaterTest::testDistributorChange()
// First sends request
setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
- for (uint32_t i = 0; i < 3; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- i, numBuckets);
- }
+ CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
+ completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
+ 3 * _bucketSpaces.size(), numBuckets);
_sender.clear();
// No change from initializing to up (when done with last job)
@@ -648,13 +674,9 @@ BucketDBUpdaterTest::testDistributorChange()
// Removing distributor. Need to refetch new data from all nodes.
setSystemState(lib::ClusterState("distributor:2 storage:3"));
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
- for (uint32_t i = 0; i < 3; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:2 storage:3"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- i, numBuckets);
- }
+ CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), _sender.commands.size());
+ completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"),
+ 3 * _bucketSpaces.size(), numBuckets);
_sender.clear();
assertCorrectBuckets(numBuckets, "distributor:2 storage:3");
}
@@ -667,13 +689,9 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping()
int numBuckets = 100;
setSystemState(lib::ClusterState("distributor:6 storage:6"));
- CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size());
- for (uint32_t i = 0; i < 6; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:6 storage:6"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- i, numBuckets);
- }
+ CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
+ completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"),
+ 6 * _bucketSpaces.size(), numBuckets);
_sender.clear();
// Distributor going down in other group, no change
@@ -693,7 +711,7 @@ BucketDBUpdaterTest::testDistributorChangeWithGrouping()
// Changed grouping cause change
setDistribution(getDistConfig6Nodes4Groups());
- CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
}
void
@@ -701,16 +719,13 @@ BucketDBUpdaterTest::testNormalUsageInitializing()
{
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i"));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
// Not yet passing on system state.
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
- fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- *((RequestBucketInfoCommand*)_sender.commands[0].get()),
- 0,
- 10,
- 10);
+ completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ _bucketSpaces.size(), 10, 10);
assertCorrectBuckets(10, "distributor:1 storage:1");
@@ -727,12 +742,10 @@ BucketDBUpdaterTest::testNormalUsageInitializing()
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1"));
// Send a new request bucket info up.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
- fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- *((RequestBucketInfoCommand*)_sender.commands[0].get()),
- 0,
- 20);
+ completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ _bucketSpaces.size(), 20);
// Pass on cluster state and recheck buckets now.
CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size());
@@ -746,33 +759,34 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo()
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1"));
// 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
{
- std::shared_ptr<api::RequestBucketInfoReply> reply =
- getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- *((RequestBucketInfoCommand*)_sender.commands[0].get()),
- 0,
- 10);
+ for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
+ std::shared_ptr<api::RequestBucketInfoReply> reply =
+ getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ *((RequestBucketInfoCommand*)_sender.commands[i].get()),
+ 0,
+ 10);
+ reply->setResult(api::ReturnCode::NOT_CONNECTED);
+ getBucketDBUpdater().onRequestBucketInfoReply(reply);
+ }
- reply->setResult(api::ReturnCode::NOT_CONNECTED);
- getBucketDBUpdater().onRequestBucketInfoReply(reply);
- // Trigger that delayed message is sent
+ // Trigger that delayed message is sent
getClock().addSecondsToTime(10);
getBucketDBUpdater().resendDelayedMessages();
}
// Should be resent.
- CPPUNIT_ASSERT_EQUAL(std::string("Request bucket info,"
- "Request bucket info"),
+ CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(2 * _bucketSpaces.size()),
_sender.getCommands());
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
- fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- *((RequestBucketInfoCommand*)_sender.commands[1].get()),
- 0,
- 10);
+ for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
+ fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ *_sender.commands[_bucketSpaces.size() + i], 10);
+ }
for (int i=0; i<10; i++) {
CPPUNIT_ASSERT_EQUAL(
@@ -791,14 +805,14 @@ BucketDBUpdaterTest::testEncodeErrorHandling()
{
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1"));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
// Not yet passing on system state.
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
- {
+ for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
std::shared_ptr<api::RequestBucketInfoReply> reply =
getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- *((RequestBucketInfoCommand*)_sender.commands[0].get()),
+ *((RequestBucketInfoCommand*)_sender.commands[i].get()),
0,
10);
@@ -815,21 +829,15 @@ BucketDBUpdaterTest::testDownWhileInit()
setStorageNodes(3);
fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
- *((RequestBucketInfoCommand*)_sender.commands[0].get()),
- 0,
- 5);
+ *_sender.commands[0], 5);
setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d"));
fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
- *((RequestBucketInfoCommand*)_sender.commands[2].get()),
- 2,
- 5);
+ *_sender.commands[2], 5);
fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
- *((RequestBucketInfoCommand*)_sender.commands[1].get()),
- 1,
- 5);
+ *_sender.commands[1], 5);
}
bool
@@ -844,6 +852,42 @@ BucketDBUpdaterTest::bucketExistsThatHasNode(int bucketCount, uint16_t node) con
return false;
}
+std::string
+BucketDBUpdaterTest::getNodeList(std::vector<uint16_t> nodes, size_t count)
+{
+ std::ostringstream ost;
+ bool first = true;
+ for (const auto &node : nodes) {
+ for (uint32_t i = 0; i < count; ++i) {
+ if (!first) {
+ ost << ",";
+ }
+ ost << node;
+ first = false;
+ }
+ }
+ return ost.str();
+}
+
+std::string
+BucketDBUpdaterTest::getNodeList(std::vector<uint16_t> nodes)
+{
+ return getNodeList(std::move(nodes), _bucketSpaces.size());
+}
+
+std::vector<uint16_t>
+BucketDBUpdaterTest::expandNodeVec(const std::vector<uint16_t> &nodes)
+{
+ std::vector<uint16_t> res;
+ size_t count = _bucketSpaces.size();
+ for (const auto &node : nodes) {
+ for (uint32_t i = 0; i < count; ++i) {
+ res.push_back(node);
+ }
+ }
+ return res;
+}
+
void
BucketDBUpdaterTest::testNodeDown()
{
@@ -903,16 +947,13 @@ BucketDBUpdaterTest::testInitializingWhileRecheck()
lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1");
setSystemState(systemState);
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(2 * _bucketSpaces.size(), _sender.commands.size());
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
- for (int i=0; i<2; i++) {
- fakeBucketReply(systemState,
- *((RequestBucketInfoCommand*)_sender.commands[i].get()),
- i,
- 100);
+ for (uint32_t i = 0; i < 2 * _bucketSpaces.size(); ++i) {
+ fakeBucketReply(systemState, *_sender.commands[i], 100);
}
// Now we can pass on system state.
@@ -931,35 +972,36 @@ BucketDBUpdaterTest::testBitChange()
{
setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2"));
- CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size());
-
- CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO);
-
- RequestBucketInfoReply* sreply =
- new RequestBucketInfoReply(*((RequestBucketInfoCommand*)_sender.commands[0].get()));
- sreply->setAddress(storageAddress(0));
- api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
-
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+
+ for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
+ CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
+ RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req);
+ sreply->setAddress(storageAddress(0));
+ api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
+ if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
+ int cnt=0;
+ for (int i=0; cnt < 2; i++) {
+ lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution();
+ std::vector<uint16_t> distributors;
+ if (distribution.getIdealDistributorNode(
+ lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"),
+ document::BucketId(16, i))
+ == 0)
+ {
+ vec.push_back(api::RequestBucketInfoReply::Entry(
+ document::BucketId(16, i),
+ api::BucketInfo(10,1,1)));
- int cnt=0;
- for (int i=0; cnt < 2; i++) {
- lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution();
- std::vector<uint16_t> distributors;
- if (distribution.getIdealDistributorNode(
- lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"),
- document::BucketId(16, i))
- == 0)
- {
- vec.push_back(api::RequestBucketInfoReply::Entry(
- document::BucketId(16, i),
- api::BucketInfo(10,1,1)));
-
- bucketlist.push_back(document::BucketId(16, i));
- cnt++;
+ bucketlist.push_back(document::BucketId(16, i));
+ cnt++;
+ }
+ }
}
- }
- getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr<RequestBucketInfoReply>(sreply));
+ getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr<RequestBucketInfoReply>(sreply));
+ }
}
CPPUNIT_ASSERT_EQUAL(
@@ -975,29 +1017,31 @@ BucketDBUpdaterTest::testBitChange()
_sender.clear();
setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2"));
- CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size());
-
- CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO);
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
+
+ CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
+ RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req);
+ sreply->setAddress(storageAddress(0));
+ sreply->setResult(api::ReturnCode::OK);
+ if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
+ api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
+
+ for (uint32_t i = 0; i < 3; ++i) {
+ vec.push_back(api::RequestBucketInfoReply::Entry(
+ document::BucketId(16, i),
+ api::BucketInfo(10,1,1)));
+ }
- RequestBucketInfoReply* sreply =
- new RequestBucketInfoReply(
- *((RequestBucketInfoCommand*)_sender.commands[0].get()));
- sreply->setAddress(storageAddress(0));
- sreply->setResult(api::ReturnCode::OK);
- api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
+ vec.push_back(api::RequestBucketInfoReply::Entry(
+ document::BucketId(16, 4),
+ api::BucketInfo(10,1,1)));
+ }
- for (uint32_t i = 0; i < 3; ++i) {
- vec.push_back(api::RequestBucketInfoReply::Entry(
- document::BucketId(16, i),
- api::BucketInfo(10,1,1)));
+ getBucketDBUpdater().onRequestBucketInfoReply(
+ std::shared_ptr<RequestBucketInfoReply>(sreply));
}
-
- vec.push_back(api::RequestBucketInfoReply::Entry(
- document::BucketId(16, 4),
- api::BucketInfo(10,1,1)));
-
- getBucketDBUpdater().onRequestBucketInfoReply(
- std::shared_ptr<RequestBucketInfoReply>(sreply));
}
CPPUNIT_ASSERT_EQUAL(
@@ -1256,7 +1300,7 @@ void
BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests()
{
setSystemState(lib::ClusterState("distributor:1 storage:1"));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
{
api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false);
@@ -1266,18 +1310,16 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests()
getBucketDBUpdater().onNotifyBucketChange(cmd);
}
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
- fakeBucketReply(
- lib::ClusterState("distributor:1 storage:1"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]),
- 0, 10);
+ completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"),
+ _bucketSpaces.size(), 10);
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size() + 1, _sender.commands.size());
{
api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[1]));
+ dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[_bucketSpaces.size()]));
CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]);
}
@@ -1286,10 +1328,10 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests()
// Queue must be cleared once pending state is enabled.
{
lib::ClusterState state("distributor:1 storage:2");
- uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn);
}
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
{
api::RequestBucketInfoCommand& rbi(
dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]));
@@ -1527,32 +1569,32 @@ void
BucketDBUpdaterTest::testPendingClusterStateSendMessages()
{
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2"),
+ getNodeList({0, 1, 2}),
getSentNodes("cluster:d",
"distributor:1 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1"),
+ getNodeList({0, 1}),
getSentNodes("cluster:d",
"distributor:1 storage:3 .2.s:m"));
CPPUNIT_ASSERT_EQUAL(
- std::string("2"),
+ getNodeList({2}),
getSentNodes("distributor:1 storage:2",
"distributor:1 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("2,3,4,5"),
+ getNodeList({2, 3, 4, 5}),
getSentNodes("distributor:1 storage:2",
"distributor:1 storage:6"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2"),
+ getNodeList({0, 1, 2}),
getSentNodes("distributor:4 storage:3",
"distributor:3 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2,3"),
+ getNodeList({0, 1, 2, 3}),
getSentNodes("distributor:4 storage:3",
"distributor:4 .2.s:d storage:4"));
@@ -1567,17 +1609,17 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages()
"distributor:4 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("2"),
+ getNodeList({2}),
getSentNodes("distributor:3 storage:3 .2.s:i",
"distributor:3 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("1"),
+ getNodeList({1}),
getSentNodes("distributor:3 storage:3 .1.s:d",
"distributor:3 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("1,2,4"),
+ getNodeList({1, 2, 4}),
getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i",
"distributor:3 storage:5"));
@@ -1602,7 +1644,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages()
"distributor:3 .2.s:m storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2"),
+ getNodeList({0, 1, 2}),
getSentNodes("distributor:3 .2.s:m storage:3",
"distributor:3 .2.s:d storage:3"));
@@ -1612,21 +1654,21 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages()
"distributor:3 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2"),
+ getNodeList({0, 1, 2}),
getSentNodesDistributionChanged("distributor:3 storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1"),
+ getNodeList({0, 1}),
getSentNodes("distributor:10 storage:2",
"distributor:10 .1.s:d storage:2"));
CPPUNIT_ASSERT_EQUAL(
- std::string("1"),
+ getNodeList({1}),
getSentNodes("distributor:2 storage:2",
"distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
CPPUNIT_ASSERT_EQUAL(
- std::string("1"),
+ getNodeList({1}),
getSentNodes("distributor:2 storage:2 .1.s:d",
"distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
@@ -1636,7 +1678,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages()
"distributor:3 .2.s:i storage:2"));
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2"),
+ getNodeList({0, 1, 2}),
getSentNodes("distributor:3 storage:3",
"distributor:3 .2.s:s storage:3"));
@@ -1646,7 +1688,7 @@ BucketDBUpdaterTest::testPendingClusterStateSendMessages()
"distributor:3 .2.s:d storage:3"));
CPPUNIT_ASSERT_EQUAL(
- std::string("1"),
+ getNodeList({1}),
getSentNodes("distributor:3 storage:3 .1.s:m",
"distributor:3 storage:3"));
@@ -1672,7 +1714,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
api::Timestamp(1)));
- CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(3 * _bucketSpaces.size(), sender.commands.size());
sortSentMessagesByIndex(sender);
@@ -1713,7 +1755,7 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDown()
// Entire group 1 goes down. Must refetch from all nodes.
CPPUNIT_ASSERT_EQUAL(
- std::string("0,1,2,3,4,5"),
+ getNodeList({0, 1, 2, 3, 4, 5}),
getSentNodes("distributor:6 storage:6",
"distributor:6 .2.s:d .3.s:d storage:6"));
@@ -1733,7 +1775,7 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDownAndNoHandover()
// Group is down, but config says to not do anything about it.
CPPUNIT_ASSERT_EQUAL(
- std::string(""),
+ getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1),
getSentNodes("distributor:6 storage:6",
"distributor:6 .2.s:d .3.s:d storage:6"));
}
@@ -1973,7 +2015,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
{
- uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
}
_sender.clear();
@@ -1988,7 +2030,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
lib::ClusterState stateAfter("distributor:3 storage:3");
{
- uint32_t expectedMsgs = 2, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = 2 * _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn);
}
CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent()
@@ -2005,7 +2047,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
{
- uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
}
_sender.clear();
@@ -2042,38 +2084,35 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP
{
lib::ClusterState stateBefore("distributor:6 storage:6");
{
- uint32_t expectedMsgs = 6, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
}
_sender.clear();
std::string distConfig(getDistConfig6Nodes3Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
- CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
// Suddenly, a wild cluster state change appears! Even though this state
// does not in itself imply any bucket changes, it will still overwrite the
// pending cluster state and thus its state of pending bucket info requests.
setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6"));
- CPPUNIT_ASSERT_EQUAL(size_t(12), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(12 * _bucketSpaces.size(), _sender.commands.size());
- // Send replies for first 6 (outdated requests).
+ // Send replies for first 6 * _bucketSpaces.size() (outdated requests).
int numBuckets = 10;
- for (uint32_t i = 0; i < 6; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:6 storage:6"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- i, numBuckets);
+ for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) {
+ fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.commands[i], numBuckets);
}
// No change from these.
assertCorrectBuckets(1, "distributor:6 storage:6");
// Send for current pending.
- for (uint32_t i = 0; i < 6; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i + 6]),
- i, numBuckets);
+ for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) {
+ fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
+ *_sender.commands[i + 6 * _bucketSpaces.size()],
+ numBuckets);
}
assertCorrectBuckets(numBuckets, "distributor:6 storage:6");
_sender.clear();
@@ -2086,7 +2125,7 @@ BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangeP
void
BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode()
{
- setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6, 20);
+ setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6 * _bucketSpaces.size(), 20);
_sender.clear();
// First cluster state; implicit scan of all buckets which does not
// use normal recovery mode ticking-path.
@@ -2098,13 +2137,11 @@ BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode()
// No replies received yet, still no recovery mode.
CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
- CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(6 * _bucketSpaces.size(), _sender.commands.size());
uint32_t numBuckets = 10;
- for (uint32_t i = 0; i < 6; ++i) {
- fakeBucketReply(
- lib::ClusterState("distributor:6 storage:6"),
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]),
- i, numBuckets);
+ for (uint32_t i = 0; i < 6 * _bucketSpaces.size(); ++i) {
+ fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.commands[i], numBuckets);
}
// Pending cluster state (i.e. distribution) has been enabled, which should
@@ -2118,7 +2155,7 @@ BucketDBUpdaterTest::testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp()
getClock().setAbsoluteTimeInSeconds(101234);
lib::ClusterState stateBefore("distributor:1 storage:1");
{
- uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
}
@@ -2134,7 +2171,7 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch()
{
{
lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i");
- uint32_t expectedMsgs = 1, dummyBucketsToReturn = 0;
+ uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 0;
// This step is required to make the distributor ready for accepting
// the below explicit database insertion towards node 0.
setAndEnableClusterState(stateBefore, expectedMsgs,
@@ -2144,7 +2181,7 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch()
getClock().setAbsoluteTimeInSeconds(1000);
lib::ClusterState state("distributor:1 storage:1");
setSystemState(state);
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
// Before replying with the bucket info, simulate the arrival of a mutation
// reply that alters the state of the bucket with information that will be
@@ -2168,11 +2205,9 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch()
// happening before t=1000 but receiving a reply at t>1000 does not affect
// correctness, as this should contain the same bucket info as that
// contained in the full bucket reply and the DB update is thus idempotent.
- fakeBucketReply(
- state,
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]),
- 0,
- bucketsReturned);
+ for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
+ fakeBucketReply(state, *_sender.commands[i], bucketsReturned);
+ }
BucketDatabase::Entry e(getBucket(bucket));
CPPUNIT_ASSERT_EQUAL(uint32_t(1), e->getNodeCount());
@@ -2231,8 +2266,9 @@ void
BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch()
{
CPPUNIT_ASSERT_EQUAL(
- (nodeVec{0, 1, 2, 3, 4, 5}),
- getSentNodesWithPreemption("version:1 distributor:6 storage:6", 6,
+ expandNodeVec({0, 1, 2, 3, 4, 5}),
+ getSentNodesWithPreemption("version:1 distributor:6 storage:6",
+ 6 * _bucketSpaces.size(),
"version:2 distributor:6 .5.s:d storage:6",
"version:3 distributor:6 storage:6"));
}
@@ -2241,9 +2277,10 @@ void
BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch()
{
CPPUNIT_ASSERT_EQUAL(
- (nodeVec{2, 3}),
+ expandNodeVec({2, 3}),
getSentNodesWithPreemption(
- "version:1 distributor:6 storage:6 .2.s:d", 5,
+ "version:1 distributor:6 storage:6 .2.s:d",
+ 5 * _bucketSpaces.size(),
"version:2 distributor:6 storage:6 .2.s:d .3.s:d",
"version:3 distributor:6 storage:6"));
}
@@ -2252,9 +2289,10 @@ void
BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched()
{
CPPUNIT_ASSERT_EQUAL(
- (nodeVec{2}),
+ expandNodeVec({2}),
getSentNodesWithPreemption(
- "version:1 distributor:6 storage:6", 6,
+ "version:1 distributor:6 storage:6",
+ 6 * _bucketSpaces.size(),
"version:2 distributor:6 storage:6 .2.s:d",
"version:3 distributor:6 storage:6"));
}
@@ -2265,7 +2303,8 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState()
CPPUNIT_ASSERT_EQUAL(
nodeVec{},
getSentNodesWithPreemption(
- "version:1 distributor:6 storage:6 .2.s:d", 5,
+ "version:1 distributor:6 storage:6 .2.s:d",
+ 5 * _bucketSpaces.size(),
"version:2 distributor:6 storage:6", // Sends to 2.
"version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
}
@@ -2276,9 +2315,10 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState()
// Even though 100 nodes are preempted, not all of these should be part
// of the request afterwards when only 6 are part of the state.
CPPUNIT_ASSERT_EQUAL(
- (nodeVec{0, 1, 2, 3, 4, 5}),
+ expandNodeVec({0, 1, 2, 3, 4, 5}),
getSentNodesWithPreemption(
- "version:1 distributor:6 storage:100", 100,
+ "version:1 distributor:6 storage:100",
+ 100 * _bucketSpaces.size(),
"version:2 distributor:5 .4.s:d storage:100",
"version:3 distributor:6 storage:6"));
}
@@ -2288,7 +2328,7 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion()
{
lib::ClusterState stateBefore(
"version:1 distributor:6 storage:6 .1.t:1234");
- uint32_t expectedMsgs = 6, dummyBucketsToReturn = 10;
+ uint32_t expectedMsgs = 6 * _bucketSpaces.size(), dummyBucketsToReturn = 10;
setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
_sender.clear();
// New cluster state that should not by itself trigger any new fetches,
@@ -2327,7 +2367,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch()
{
// Same number of online disks, but the set of disks has changed.
CPPUNIT_ASSERT_EQUAL(
- std::string("1"),
+ getNodeList({1}),
getSentNodes("distributor:2 storage:2 .1.d:3 .1.d.2.s:d",
"distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
}
@@ -2343,7 +2383,7 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch()
void
BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
{
- uint32_t expectedMsgs = 3, dummyBucketsToReturn = 1;
+ uint32_t expectedMsgs = 3 * _bucketSpaces.size(), dummyBucketsToReturn = 1;
setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"),
expectedMsgs, dummyBucketsToReturn);
_sender.clear();
@@ -2371,11 +2411,11 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
// Attempt to apply state with {0, 1} set. This will compare the new state
// with the previous state, which still has node 2.
- expectedMsgs = 2;
+ expectedMsgs = 2 * _bucketSpaces.size();
setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"),
expectedMsgs, dummyBucketsToReturn);
- CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1}), getSendSet());
+ CPPUNIT_ASSERT_EQUAL(expandNodeVec({0, 1}), getSendSet());
}
void
@@ -2413,7 +2453,7 @@ BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer()
void
BucketDBUpdaterTest::transition_time_tracked_for_single_state_change()
{
- completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2);
+ completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size());
CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis());
}
@@ -2421,8 +2461,8 @@ BucketDBUpdaterTest::transition_time_tracked_for_single_state_change()
void
BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes()
{
- completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2);
- completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1);
+ completeStateTransitionInSeconds("distributor:2 storage:2", 5, 2 * _bucketSpaces.size());
+ completeStateTransitionInSeconds("distributor:2 storage:3", 3, 1 * _bucketSpaces.size());
CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis());
}
@@ -2431,13 +2471,13 @@ void
BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change()
{
lib::ClusterState state("distributor:2 storage:2");
- setAndEnableClusterState(state, 2, 1);
+ setAndEnableClusterState(state, 2 * _bucketSpaces.size(), 1);
_sender.clear();
std::string distConfig(getDistConfig3Nodes1Group());
setDistribution(distConfig);
getClock().addSecondsToTime(4);
- completeBucketInfoGathering(state, 2);
+ completeBucketInfoGathering(state, 2 * _bucketSpaces.size());
CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis());
}
@@ -2451,7 +2491,7 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions()
// Pre-empted with new state here, which will push out the old pending
// state and replace it with a new one. We should still count the time
// used processing the old state.
- completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3);
+ completeStateTransitionInSeconds("distributor:2 storage:3", 3, 3 * _bucketSpaces.size());
CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis());
}
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 1640af0f871..d585c4d0d32 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -9,6 +9,7 @@
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/config/config-stor-distributormanager.h>
@@ -18,6 +19,7 @@
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
+using document::FixedBucketSpaces;
namespace storage {
@@ -58,6 +60,9 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(closing_aborts_priority_queued_client_requests);
CPPUNIT_TEST_SUITE_END();
+public:
+ Distributor_Test();
+
protected:
void testOperationGeneration();
void testOperationsGeneratedAndStartedWithoutDuplicates();
@@ -89,9 +94,12 @@ protected:
void internal_messages_are_started_in_fifo_order_batch();
void closing_aborts_priority_queued_client_requests();
+ std::vector<document::BucketSpace> _bucketSpaces;
+
public:
void setUp() override {
createLinks();
+ _bucketSpaces = getBucketSpaces();
};
void tearDown() override {
@@ -197,6 +205,13 @@ private:
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
+Distributor_Test::Distributor_Test()
+ : CppUnit::TestFixture(),
+ DistributorTestUtil(),
+ _bucketSpaces()
+{
+}
+
void
Distributor_Test::testOperationGeneration()
{
@@ -752,19 +767,23 @@ void Distributor_Test::sendDownClusterStateCommand() {
}
void Distributor_Test::replyToSingleRequestBucketInfoCommandWith1Bucket() {
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
- CPPUNIT_ASSERT_EQUAL(api::MessageType::REQUESTBUCKETINFO,
- _sender.commands[0]->getType());
- auto& bucketReq(static_cast<api::RequestBucketInfoCommand&>(
- *_sender.commands[0]));
- auto bucketReply = bucketReq.makeReply();
- // Make sure we have a bucket to route our remove op to, or we'd get
- // an immediate reply anyway.
- dynamic_cast<api::RequestBucketInfoReply&>(*bucketReply)
- .getBucketInfo().push_back(
- api::RequestBucketInfoReply::Entry(document::BucketId(1, 1),
- api::BucketInfo(20, 10, 12, 50, 60, true, true)));
- _distributor->handleMessage(std::move(bucketReply));
+ CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ for (uint32_t i = 0; i < _sender.commands.size(); ++i) {
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::REQUESTBUCKETINFO,
+ _sender.commands[i]->getType());
+ auto& bucketReq(static_cast<api::RequestBucketInfoCommand&>
+ (*_sender.commands[i]));
+ auto bucketReply = bucketReq.makeReply();
+ if (bucketReq.getBucketSpace() == FixedBucketSpaces::default_space()) {
+ // Make sure we have a bucket to route our remove op to, or we'd get
+ // an immediate reply anyway.
+ dynamic_cast<api::RequestBucketInfoReply&>(*bucketReply)
+ .getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(document::BucketId(1, 1),
+ api::BucketInfo(20, 10, 12, 50, 60, true, true)));
+ }
+ _distributor->handleMessage(std::move(bucketReply));
+ }
_sender.commands.clear();
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index fedd836513b..8aa9ffadebe 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -376,4 +376,14 @@ DistributorTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
}
+std::vector<document::BucketSpace>
+DistributorTestUtil::getBucketSpaces() const
+{
+ std::vector<document::BucketSpace> res;
+ for (const auto &repo : getBucketSpaceRepo()) {
+ res.push_back(repo.first);
+ }
+ return res;
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 19da0483165..45aaf6b1dc7 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -166,6 +166,7 @@ public:
BucketDatabase::Entry getBucket(const document::BucketId& bId) const;
+ std::vector<document::BucketSpace> getBucketSpaces() const;
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 945ccfa1484..7103a89229d 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -9,11 +9,13 @@
#include <vespa/storageapi/message/visitor.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <tests/distributor/distributortestutil.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
+using document::FixedBucketSpaces;
namespace storage {
namespace distributor {
@@ -22,9 +24,14 @@ class IdealStateManagerTest : public CppUnit::TestFixture,
public DistributorTestUtil
{
public:
- IdealStateManagerTest() {}
+ IdealStateManagerTest()
+ : CppUnit::TestFixture(),
+ DistributorTestUtil(),
+ _bucketSpaces()
+ {}
void setUp() override {
createLinks();
+ _bucketSpaces = getBucketSpaces();
};
void tearDown() override {
@@ -54,6 +61,9 @@ public:
CPPUNIT_TEST(testBlockIdealStateOpsOnFullRequestBucketInfo);
CPPUNIT_TEST(testBlockCheckForAllOperationsToSpecificBucket);
CPPUNIT_TEST_SUITE_END();
+private:
+ std::vector<document::BucketSpace> _bucketSpaces;
+ std::string makeBucketStatusString(const std::string &defaultSpaceBucketStatus);
};
CPPUNIT_TEST_SUITE_REGISTRATION(IdealStateManagerTest);
@@ -91,8 +101,7 @@ IdealStateManagerTest::testStatusPage() {
std::ostringstream ost;
getIdealStateManager().getBucketStatus(ost);
- CPPUNIT_ASSERT_EQUAL(std::string("<h2>default - BucketSpace(0x0000000000000001)</h2>\n"
- "BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]<br>\n"
+ CPPUNIT_ASSERT_EQUAL(makeBucketStatusString("BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]<br>\n"
"<b>BucketId(0x4000000000000005):</b> <i> : split: [Splitting bucket because its maximum size (200 b, 100 docs, 100 meta, 200 b total) is "
"higher than the configured limit of (100, 1000000)]</i> [node(idx=0,crc=0xff,docs=100/100,bytes=200/200,trusted=true,"
"active=true,ready=false)]<br>\n"),
@@ -113,8 +122,7 @@ IdealStateManagerTest::testDisabledStateChecker() {
std::ostringstream ost;
getIdealStateManager().getBucketStatus(ost);
- CPPUNIT_ASSERT_EQUAL(std::string(
- "<h2>default - BucketSpace(0x0000000000000001)</h2>\n"
+ CPPUNIT_ASSERT_EQUAL(makeBucketStatusString(
"BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]<br>\n"
"<b>BucketId(0x4000000000000005):</b> <i> : split: [Splitting bucket because its maximum size (200 b, 100 docs, 100 meta, 200 b total) is "
"higher than the configured limit of (100, 1000000)]</i> [node(idx=0,crc=0xff,docs=100/100,bytes=200/200,trusted=true,"
@@ -261,6 +269,19 @@ IdealStateManagerTest::testBlockCheckForAllOperationsToSpecificBucket()
}
}
+std::string
+IdealStateManagerTest::makeBucketStatusString(const std::string &defaultSpaceBucketStatus)
+{
+ std::ostringstream ost;
+ for (const auto &bucketSpace : _bucketSpaces) {
+ ost << "<h2>" << FixedBucketSpaces::to_string(bucketSpace) << " - " << bucketSpace << "</h2>\n";
+ if (bucketSpace == FixedBucketSpaces::default_space()) {
+ ost << defaultSpaceBucketStatus;
+ }
+ }
+ return ost.str();
+}
+
} // distributor
} // storage