diff options
Diffstat (limited to 'storage/src/tests/distributor/bucketdbupdatertest.cpp')
-rw-r--r-- | storage/src/tests/distributor/bucketdbupdatertest.cpp | 2296 |
1 files changed, 2296 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp new file mode 100644 index 00000000000..a1c933d2606 --- /dev/null +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -0,0 +1,2296 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <iomanip> +#include <iostream> +#include <memory> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/distributor/bucketdbupdater.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/vdslib/state/random.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storage/distributor/pendingclusterstate.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/storageframework.h> +#include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/storage/storageutil/distributorstatecache.h> +#include <tests/distributor/distributortestutil.h> +#include <tests/distributor/messagesenderstub.h> +#include <vespa/storage/distributor/simpleclusterinformation.h> + +#include <iostream> +#include <fstream> +#include <string> + +using namespace storage::api; +using namespace storage::lib; + +namespace storage { +namespace distributor { + +class BucketDBUpdaterTest : public CppUnit::TestFixture, + public DistributorTestUtil +{ + CPPUNIT_TEST_SUITE(BucketDBUpdaterTest); + CPPUNIT_TEST(testNormalUsage); // Make sure that bucketdbupdater sends requests to nodes, send responses back for 3 nodes, check that bucketdb is in correct state + CPPUNIT_TEST(testDistributorChange); + CPPUNIT_TEST(testDistributorChangeWithGrouping); + CPPUNIT_TEST(testNormalUsageInitializing); // Check that we send request bucket info when storage node is initializing, and send another when it's up. + CPPUNIT_TEST(testFailedRequestBucketInfo); + CPPUNIT_TEST(testBitChange); // Check what happens when distribution bits change + CPPUNIT_TEST(testNodeDown); + CPPUNIT_TEST(testStorageNodeInMaintenanceClearsBucketsForNode); + CPPUNIT_TEST(testNodeDownCopiesGetInSync); + CPPUNIT_TEST(testDownWhileInit); + CPPUNIT_TEST(testInitializingWhileRecheck); + CPPUNIT_TEST(testRecheckNode); + CPPUNIT_TEST(testRecheckNodeWithFailure); + CPPUNIT_TEST(testNotifyBucketChange); + CPPUNIT_TEST(testNotifyBucketChangeFromNodeDown); + CPPUNIT_TEST(testNotifyChangeWithPendingStateQueuesBucketInfoRequests); + CPPUNIT_TEST(testMergeReply); + CPPUNIT_TEST(testMergeReplyNodeDown); + CPPUNIT_TEST(testMergeReplyNodeDownAfterRequestSent); + CPPUNIT_TEST(testFlush); + CPPUNIT_TEST(testPendingClusterStateSendMessages); + CPPUNIT_TEST(testPendingClusterStateReceive); + CPPUNIT_TEST(testPendingClusterStateMerge); + CPPUNIT_TEST(testPendingClusterStateMergeReplicaChanged); + CPPUNIT_TEST(testPendingClusterStateWithGroupDown); + CPPUNIT_TEST(testPendingClusterStateWithGroupDownAndNoHandover); + CPPUNIT_TEST(testNoDbResurrectionForBucketNotOwnedInCurrentState); + CPPUNIT_TEST(testNoDbResurrectionForBucketNotOwnedInPendingState); + CPPUNIT_TEST(testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending); + CPPUNIT_TEST(testChangedDistributionConfigTriggersRecoveryMode); + CPPUNIT_TEST(testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp); + CPPUNIT_TEST(testNewerMutationsNotOverwrittenByEarlierBucketFetch); + CPPUNIT_TEST(preemptedDistrChangeCarriesNodeSetOverToNextStateFetch); + CPPUNIT_TEST(preemptedStorChangeCarriesNodeSetOverToNextStateFetch); + CPPUNIT_TEST(preemptedStorageNodeDownMustBeReFetched); + CPPUNIT_TEST(outdatedNodeSetClearedAfterSuccessfulStateCompletion); + CPPUNIT_TEST(doNotSendToPreemptedNodeNowInDownState); + CPPUNIT_TEST(doNotSendToPreemptedNodeNotPartOfNewState); + CPPUNIT_TEST_DISABLED(clusterConfigDownsizeOnlySendsToAvailableNodes); + CPPUNIT_TEST(changedDiskSetTriggersReFetch); + CPPUNIT_TEST(nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer); + CPPUNIT_TEST_SUITE_END(); + +protected: + void testNormalUsage(); + void testDistributorChange(); + void testDistributorChangeWithGrouping(); + void testNormalUsageInitializing(); + void testFailedRequestBucketInfo(); + void testNoResponses(); + void testBitChange(); + void testInconsistentChecksum(); + void testAddEmptyNode(); + void testNodeDown(); + void testStorageNodeInMaintenanceClearsBucketsForNode(); + void testNodeDownCopiesGetInSync(); + void testDownWhileInit(); + void testInitializingWhileRecheck(); + void testRecheckNode(); + void testRecheckNodeWithFailure(); + void testNotifyBucketChange(); + void testNotifyBucketChangeFromNodeDown(); + void testNotifyChangeWithPendingStateQueuesBucketInfoRequests(); + void testMergeReply(); + void testMergeReplyNodeDown(); + void testMergeReplyNodeDownAfterRequestSent(); + void testFlush(); + void testPendingClusterStateSendMessages(); + void testPendingClusterStateReceive(); + void testPendingClusterStateMerge(); + void testPendingClusterStateMergeReplicaChanged(); + void testPendingClusterStateWithGroupDown(); + void testPendingClusterStateWithGroupDownAndNoHandover(); + void testNoDbResurrectionForBucketNotOwnedInCurrentState(); + void testNoDbResurrectionForBucketNotOwnedInPendingState(); + void testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending(); + void testChangedDistributionConfigTriggersRecoveryMode(); + void testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp(); + void testNewerMutationsNotOverwrittenByEarlierBucketFetch(); + void preemptedDistrChangeCarriesNodeSetOverToNextStateFetch(); + void preemptedStorChangeCarriesNodeSetOverToNextStateFetch(); + void preemptedStorageNodeDownMustBeReFetched(); + void outdatedNodeSetClearedAfterSuccessfulStateCompletion(); + void doNotSendToPreemptedNodeNowInDownState(); + void doNotSendToPreemptedNodeNotPartOfNewState(); + void clusterConfigDownsizeOnlySendsToAvailableNodes(); + void changedDiskSetTriggersReFetch(); + void nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer(); + + bool bucketExistsThatHasNode(int bucketCount, uint16_t node) const; + + ClusterInformation::CSP createClusterInfo(const std::string& clusterState) { + ClusterInformation::CSP clusterInfo( + new SimpleClusterInformation( + getBucketDBUpdater().getDistributorComponent().getIndex(), + getBucketDBUpdater().getDistributorComponent().getDistribution(), + lib::ClusterState(clusterState), + "ui")); + return clusterInfo; + } + +public: + void setUp() { + createLinks(); + }; + + void tearDown() { + close(); + } + + std::shared_ptr<RequestBucketInfoReply> getFakeBucketReply( + const lib::ClusterState& state, + RequestBucketInfoCommand& cmd, + int storageIndex, + int bucketCount, + int invalidBucketCount = 0) + { + RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd); + sreply->setAddress(storageAddress(storageIndex)); + + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + + for (int i=0; i<bucketCount + invalidBucketCount; i++) { + if (!getBucketDBUpdater().getDistributorComponent() + .ownsBucketInState(state, document::BucketId(16, i))) { + continue; + } + + std::vector<uint16_t> nodes; + getBucketDBUpdater().getDistributorComponent() + .getDistribution().getIdealNodes( + lib::NodeType::STORAGE, + state, + document::BucketId(16, i), + nodes); + + for (uint32_t j=0; j<nodes.size(); j++) { + if (nodes[j] == storageIndex) { + if (i >= bucketCount) { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo())); + } else { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + } + } + } + } + + return std::shared_ptr<api::RequestBucketInfoReply>(sreply); + } + + void fakeBucketReply( + const lib::ClusterState& state, + RequestBucketInfoCommand& cmd, + int storageIndex, + int bucketCount, + int invalidBucketCount = 0) + { + getBucketDBUpdater().onRequestBucketInfoReply( + getFakeBucketReply(state, + cmd, + storageIndex, + bucketCount, + invalidBucketCount)); + } + + void sendFakeReplyForSingleBucketRequest( + const api::RequestBucketInfoCommand& rbi) + { + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + const document::BucketId& bucket(rbi.getBuckets()[0]); + + std::shared_ptr<api::RequestBucketInfoReply> reply( + new api::RequestBucketInfoReply(rbi)); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(bucket, + api::BucketInfo(20, 10, 12, 50, 60, true, true))); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + } + + std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) { + BucketDatabase::Entry entry = getBucketDatabase().get(id); + if (!entry.valid()) { + return vespalib::make_string("%s doesn't exist in DB", + id.toString().c_str()); + } + + std::vector<uint16_t> nodes; + getBucketDBUpdater().getDistributorComponent().getDistribution().getIdealNodes( + lib::NodeType::STORAGE, + state, + document::BucketId(id), + nodes); + + if (nodes.size() != entry->getNodeCount()) { + return vespalib::make_string("Bucket Id %s has %d nodes in " + "ideal state, but has only %d in DB", + id.toString().c_str(), + (int)nodes.size(), + (int)entry->getNodeCount()); + } + + for (uint32_t i = 0; i<nodes.size(); i++) { + bool found = false; + + for (uint32_t j = 0; j<entry->getNodeCount(); j++) { + if (nodes[i] == entry->getNodeRef(j).getNode()) { + found = true; + } + } + + if (!found) { + return vespalib::make_string( + "Bucket Id %s has no copy from node %d", + id.toString().c_str(), + nodes[i]); + } + } + + return ""; + } + + + void verifyInvalid(document::BucketId id, int storageNode) { + BucketDatabase::Entry entry = getBucketDatabase().get(id); + + CPPUNIT_ASSERT(entry.valid()); + + bool found = false; + for (uint32_t j = 0; j<entry->getNodeCount(); j++) { + if (entry->getNodeRef(j).getNode() == storageNode) { + CPPUNIT_ASSERT(!entry->getNodeRef(j).valid()); + found = true; + } + } + + CPPUNIT_ASSERT(found); + } + + struct OrderByIncreasingNodeIndex { + template <typename T> + bool operator()(const T& lhs, const T& rhs) { + return (lhs->getAddress()->getIndex() + < rhs->getAddress()->getIndex()); + } + }; + + void sortSentMessagesByIndex(MessageSenderStub& sender, + size_t sortFromOffset = 0) + { + std::sort(sender.commands.begin() + sortFromOffset, + sender.commands.end(), + OrderByIncreasingNodeIndex()); + } + + void setSystemState(const lib::ClusterState& state) { + const size_t sizeBeforeState = _sender.commands.size(); + getBucketDBUpdater().onSetSystemState( + std::shared_ptr<api::SetSystemStateCommand>( + new api::SetSystemStateCommand(state))); + // A lot of test logic has the assumption that all messages sent as a + // result of cluster state changes will be in increasing index order + // (for simplicity, not because this is required for correctness). + // Only sort the messages that arrived as a result of the state, don't + // jumble the sorting with any existing messages. + sortSentMessagesByIndex(_sender, sizeBeforeState); + } + + void setAndEnableClusterState(const lib::ClusterState& state, + uint32_t expectedMsgs, + uint32_t nBuckets) { + _sender.clear(); + setSystemState(state); + CPPUNIT_ASSERT_EQUAL(size_t(expectedMsgs), _sender.commands.size()); + + for (uint32_t i = 0; i < _sender.commands.size(); i++) { + CPPUNIT_ASSERT(_sender.commands[i]->getType() == + MessageType::REQUESTBUCKETINFO); + + const api::StorageMessageAddress& address( + *_sender.commands[i]->getAddress()); + fakeBucketReply( + state, + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + address.getIndex(), + nBuckets); + } + } + + + void setStorageNodes(uint32_t numStorageNodes) { + _sender.clear(); + + lib::ClusterState newState( + vespalib::make_string("distributor:1 storage:%d", numStorageNodes)); + + setSystemState(newState); + + for (uint32_t i=0; i<numStorageNodes; i++) { + CPPUNIT_ASSERT(_sender.commands[i]->getType() == + MessageType::REQUESTBUCKETINFO); + + const api::StorageMessageAddress *address = _sender.commands[i]->getAddress(); + CPPUNIT_ASSERT_EQUAL(i, (uint32_t)address->getIndex()); + } + } + + void initializeNodesAndBuckets(uint32_t numStorageNodes, + uint32_t numBuckets) + { + setStorageNodes(numStorageNodes); + + vespalib::string state(vespalib::make_string( + "distributor:1 storage:%d", numStorageNodes)); + lib::ClusterState newState(state); + + for (uint32_t i=0; i<numStorageNodes; i++) { + fakeBucketReply(newState, + *((RequestBucketInfoCommand*)_sender.commands[i].get()), + i, + numBuckets); + } + assertCorrectBuckets(numBuckets, state); + } + + bool bucketHasNode(document::BucketId id, uint16_t node) const { + BucketDatabase::Entry entry = getBucket(id); + CPPUNIT_ASSERT(entry.valid()); + + for (uint32_t j=0; j<entry->getNodeCount(); j++) { + if (entry->getNodeRef(j).getNode() == node) { + return true; + } + } + + return false; + } + + api::StorageMessageAddress storageAddress(uint16_t node) { + return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, node); + } + + std::string getSentNodes(const std::string& oldClusterState, + const std::string& newClusterState); + + std::string getSentNodesDistributionChanged( + const std::string& oldClusterState); + + std::vector<uint16_t> getSentNodesWithPreemption( + const std::string& oldClusterState, + uint32_t expectedOldStateMessages, + const std::string& preemptedClusterState, + const std::string& newClusterState); + + std::vector<uint16_t> getSendSet() const; + + std::string mergeBucketLists( + const lib::ClusterState& oldState, + const std::string& existingData, + const lib::ClusterState& newState, + const std::string& newData, + bool includeBucketInfo = false); + + std::string mergeBucketLists( + const std::string& existingData, + const std::string& newData, + bool includeBucketInfo = false); + + void assertCorrectBuckets(int numBuckets, const std::string& stateStr) { + lib::ClusterState state(stateStr); + for (int i=0; i<numBuckets; i++) { + CPPUNIT_ASSERT_EQUAL( + getIdealStr(document::BucketId(16, i), state), + getNodes(document::BucketId(16, i))); + } + } + + void setDistribution(const std::string& distConfig) { + lib::Distribution* distribution = new lib::Distribution(distConfig); + _node->getComponentRegister().setDistribution( + lib::Distribution::SP(distribution)); + } + + std::string getDistConfig6Nodes3Groups() const { + return ("redundancy 2\n" + "group[3]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[3]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[1].nodes[2].index 2\n" + "group[2].name rack1\n" + "group[2].index 1\n" + "group[2].nodes[3]\n" + "group[2].nodes[0].index 3\n" + "group[2].nodes[1].index 4\n" + "group[2].nodes[2].index 5\n"); + } + + std::string getDistConfig6Nodes4Groups() const { + return ("redundancy 2\n" + "group[4]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[2]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[2].name rack1\n" + "group[2].index 1\n" + "group[2].nodes[2]\n" + "group[2].nodes[0].index 2\n" + "group[2].nodes[1].index 3\n" + "group[3].name rack2\n" + "group[3].index 2\n" + "group[3].nodes[2]\n" + "group[3].nodes[0].index 4\n" + "group[3].nodes[1].index 5\n"); + } + + std::string getDistConfig3Nodes1Group() const { + return ("redundancy 2\n" + "group[2]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[3]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[1].nodes[2].index 2\n"); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest); + +void +BucketDBUpdaterTest::testNormalUsage() +{ + setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); + + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + + // Ensure distribution hash is set correctly + CPPUNIT_ASSERT_EQUAL( + getBucketDBUpdater().getDistributorComponent().getDistribution() + .getNodeGraph().getDistributionConfigHash(), + dynamic_cast<const RequestBucketInfoCommand&>( + *_sender.commands[0]).getDistributionHash()); + + fakeBucketReply( + lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]), + 0, 10); + + _sender.clear(); + + // Optimization for not refetching unneeded data after cluster state + // change is only implemented after completion of previous cluster state + setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3")); + + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + // Expect reply of first set SystemState request. + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + + for (uint32_t i = 0; i < 3; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + i, 10); + } + + assertCorrectBuckets(10, "distributor:2 storage:3"); +} + +void +BucketDBUpdaterTest::testDistributorChange() +{ + int numBuckets = 100; + + // First sends request + setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + for (uint32_t i = 0; i < 3; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + i, numBuckets); + } + _sender.clear(); + + // No change from initializing to up (when done with last job) + setSystemState(lib::ClusterState("distributor:2 storage:3")); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + _sender.clear(); + + // Adding node. No new read requests, but buckets thrown + setSystemState(lib::ClusterState("distributor:3 storage:3")); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + assertCorrectBuckets(numBuckets, "distributor:3 storage:3"); + _sender.clear(); + + // Removing distributor. Need to refetch new data from all nodes. + setSystemState(lib::ClusterState("distributor:2 storage:3")); + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + for (uint32_t i = 0; i < 3; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:2 storage:3"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + i, numBuckets); + } + _sender.clear(); + assertCorrectBuckets(numBuckets, "distributor:2 storage:3"); +} + +void +BucketDBUpdaterTest::testDistributorChangeWithGrouping() +{ + std::string distConfig(getDistConfig6Nodes3Groups()); + setDistribution(distConfig); + _distributor->enableNextDistribution(); + int numBuckets = 100; + + setSystemState(lib::ClusterState("distributor:6 storage:6")); + CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + for (uint32_t i = 0; i < 6; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:6 storage:6"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + i, numBuckets); + } + _sender.clear(); + + // Distributor going down in other group, no change + setSystemState(lib::ClusterState("distributor:6 .5.s:d storage:6")); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + _sender.clear(); + + setSystemState(lib::ClusterState("distributor:6 storage:6")); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); + _sender.clear(); + + // Unchanged grouping cause no change. + setDistribution(distConfig); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + + // Changed grouping cause change + setDistribution(getDistConfig6Nodes4Groups()); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); + + CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); +} + +void +BucketDBUpdaterTest::testNormalUsageInitializing() +{ + setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i")); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + // Not yet passing on system state. + CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + + fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *((RequestBucketInfoCommand*)_sender.commands[0].get()), + 0, + 10, + 10); + + assertCorrectBuckets(10, "distributor:1 storage:1"); + + for (int i=10; i<20; i++) { + verifyInvalid(document::BucketId(16, i), 0); + } + + // Pass on cluster state and recheck buckets now. + CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); + + _sender.clear(); + _senderDown.clear(); + + setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); + + // Send a new request bucket info up. + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *((RequestBucketInfoCommand*)_sender.commands[0].get()), + 0, + 20); + + // Pass on cluster state and recheck buckets now. + CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); + + assertCorrectBuckets(20, "distributor:1 storage:1"); +} + +void +BucketDBUpdaterTest::testFailedRequestBucketInfo() +{ + setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); + + // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + { + std::shared_ptr<api::RequestBucketInfoReply> reply = + getFakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *((RequestBucketInfoCommand*)_sender.commands[0].get()), + 0, + 10); + + reply->setResult(api::ReturnCode::NOT_CONNECTED); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + // Trigger that delayed message is sent + getClock().addSecondsToTime(10); + getBucketDBUpdater().resendDelayedMessages(); + } + + // Should be resent. + CPPUNIT_ASSERT_EQUAL(std::string("Request bucket info," + "Request bucket info"), + _sender.getCommands()); + + CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + + fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *((RequestBucketInfoCommand*)_sender.commands[1].get()), + 0, + 10); + + for (int i=0; i<10; i++) { + CPPUNIT_ASSERT_EQUAL( + std::string(""), + verifyBucket(document::BucketId(16, i), + lib::ClusterState("distributor:1 storage:1"))); + } + + // Set system state should now be passed on + CPPUNIT_ASSERT_EQUAL(std::string("Set system state"), + _senderDown.getCommands()); +} + +void +BucketDBUpdaterTest::testDownWhileInit() +{ + setStorageNodes(3); + + fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), + *((RequestBucketInfoCommand*)_sender.commands[0].get()), + 0, + 5); + + setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); + + fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), + *((RequestBucketInfoCommand*)_sender.commands[2].get()), + 2, + 5); + + fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), + *((RequestBucketInfoCommand*)_sender.commands[1].get()), + 1, + 5); +} + +bool +BucketDBUpdaterTest::bucketExistsThatHasNode(int bucketCount, uint16_t node) const +{ + for (int i=1; i<bucketCount; i++) { + if (bucketHasNode(document::BucketId(16, i), node)) { + return true; + } + } + + return false; +} + +void +BucketDBUpdaterTest::testNodeDown() +{ + setStorageNodes(3); + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + + for (int i=1; i<100; i++) { + addIdealNodes(document::BucketId(16, i)); + } + + CPPUNIT_ASSERT(bucketExistsThatHasNode(100, 1)); + + setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); + + CPPUNIT_ASSERT(!bucketExistsThatHasNode(100, 1)); +} + +void +BucketDBUpdaterTest::testStorageNodeInMaintenanceClearsBucketsForNode() +{ + setStorageNodes(3); + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + + for (int i=1; i<100; i++) { + addIdealNodes(document::BucketId(16, i)); + } + + CPPUNIT_ASSERT(bucketExistsThatHasNode(100, 1)); + + setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:m")); + + CPPUNIT_ASSERT(!bucketExistsThatHasNode(100, 1)); +} + +void +BucketDBUpdaterTest::testNodeDownCopiesGetInSync() +{ + setStorageNodes(3); + + lib::ClusterState systemState("distributor:1 storage:3"); + document::BucketId bid(16, 1); + + addNodesToBucketDB(bid, "0=3,1=2,2=3"); + + setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false), " + "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false)"), + dumpBucket(bid)); +} + +void +BucketDBUpdaterTest::testInitializingWhileRecheck() +{ + lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); + setSystemState(systemState); + + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + + getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3)); + + for (int i=0; i<2; i++) { + fakeBucketReply(systemState, + *((RequestBucketInfoCommand*)_sender.commands[i].get()), + i, + 100); + } + + // Now we can pass on system state. + CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); + + CPPUNIT_ASSERT_EQUAL(MessageType::SETSYSTEMSTATE, + _senderDown.commands[0]->getType()); +} + +void +BucketDBUpdaterTest::testBitChange() +{ + + std::vector<document::BucketId> bucketlist; + + { + setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2")); + + CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size()); + + CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO); + + RequestBucketInfoReply* sreply = + new RequestBucketInfoReply(*((RequestBucketInfoCommand*)_sender.commands[0].get())); + sreply->setAddress(storageAddress(0)); + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + + + int cnt=0; + for (int i=0; cnt < 2; i++) { + lib::Distribution distribution = getBucketDBUpdater().getDistributorComponent() + .getDistribution(); + std::vector<uint16_t> distributors; + if (distribution.getIdealDistributorNode( + lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), + document::BucketId(16, i)) + == 0) + { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + + bucketlist.push_back(document::BucketId(16, i)); + cnt++; + } + } + + getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr<RequestBucketInfoReply>(sreply)); + } + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false)"), + dumpBucket(bucketlist[0])); + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false)"), + dumpBucket(bucketlist[1])); + + { + _sender.clear(); + setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2")); + + CPPUNIT_ASSERT_EQUAL(1, (int)_sender.commands.size()); + + CPPUNIT_ASSERT(_sender.commands[0]->getType() == MessageType::REQUESTBUCKETINFO); + + RequestBucketInfoReply* sreply = + new RequestBucketInfoReply( + *((RequestBucketInfoCommand*)_sender.commands[0].get())); + sreply->setAddress(storageAddress(0)); + sreply->setResult(api::ReturnCode::OK); + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + + for (uint32_t i = 0; i < 3; ++i) { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + } + + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, 4), + api::BucketInfo(10,1,1))); + + getBucketDBUpdater().onRequestBucketInfoReply( + std::shared_ptr<RequestBucketInfoReply>(sreply)); + } + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000000) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false)"), + dumpBucket(document::BucketId(16, 0))); + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false)"), + dumpBucket(document::BucketId(16, 1))); + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false)"), + dumpBucket(document::BucketId(16, 2))); + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000004) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false)"), + dumpBucket(document::BucketId(16, 4))); + + { + _sender.clear(); + setSystemState(lib::ClusterState("storage:1 distributor:2 .1.s:i")); + } + + { + _sender.clear(); + setSystemState(lib::ClusterState("storage:1 distributor:2")); + } +}; + +void +BucketDBUpdaterTest::testRecheckNodeWithFailure() +{ + initializeNodesAndBuckets(3, 5); + + _sender.clear(); + + getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3)); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + + uint16_t index = 0; + { + api::RequestBucketInfoCommand& rbi( + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]); + auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + + const api::StorageMessageAddress *address = _sender.commands[0]->getAddress(); + index = address->getIndex(); + + reply->setResult(api::ReturnCode::NOT_CONNECTED); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + // Trigger that delayed message is sent + getClock().addSecondsToTime(10); + getBucketDBUpdater().resendDelayedMessages(); + } + + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + + setSystemState( + lib::ClusterState(vespalib::make_string("distributor:1 storage:3 .%d.s:d", index))); + + // Recheck bucket. + { + api::RequestBucketInfoCommand& rbi(dynamic_cast<RequestBucketInfoCommand&> + (*_sender.commands[1])); + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]); + auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + reply->setResult(api::ReturnCode::NOT_CONNECTED); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + } + + // Should not retry since node is down. + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); +} + +void +BucketDBUpdaterTest::testRecheckNode() +{ + initializeNodesAndBuckets(3, 5); + + _sender.clear(); + + getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3)); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + api::RequestBucketInfoCommand& rbi( + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]); + + auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(document::BucketId(16, 3), + api::BucketInfo(20, 10, 12, 50, 60, true, true))); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + + lib::ClusterState state("distributor:1 storage:3"); + for (uint32_t i = 0; i < 3; i++) { + CPPUNIT_ASSERT_EQUAL( + getIdealStr(document::BucketId(16, i), state), + getNodes(document::BucketId(16, i))); + } + + for (uint32_t i = 4; i < 5; i++) { + CPPUNIT_ASSERT_EQUAL( + getIdealStr(document::BucketId(16, i), state), + getNodes(document::BucketId(16, i))); + } + + BucketDatabase::Entry entry = getBucketDatabase().get(document::BucketId(16, 3)); + CPPUNIT_ASSERT(entry.valid()); + + const BucketCopy* copy = entry->getNode(1); + CPPUNIT_ASSERT(copy != 0); + CPPUNIT_ASSERT_EQUAL(api::BucketInfo(20,10,12, 50, 60, true, true), + copy->getBucketInfo()); +} + +void +BucketDBUpdaterTest::testNotifyBucketChange() +{ + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:1")); + + addNodesToBucketDB(document::BucketId(16, 1), "0=1234"); + _sender.replies.clear(); + + { + api::BucketInfo info(1, 2, 3, 4, 5, true, true); + auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( + document::BucketId(16, 1), info)); + cmd->setSourceIndex(0); + getBucketDBUpdater().onNotifyBucketChange(cmd); + } + + { + api::BucketInfo info(10, 11, 12, 13, 14, false, false); + auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( + document::BucketId(16, 2), info)); + cmd->setSourceIndex(0); + getBucketDBUpdater().onNotifyBucketChange(cmd); + } + + // Must receive reply + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.replies.size()); + + for (int i = 0; i < 2; ++i) { + CPPUNIT_ASSERT_EQUAL(MessageType::NOTIFYBUCKETCHANGE_REPLY, + _sender.replies[i]->getType()); + } + + // No database update until request bucket info replies have been received. + CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," + "trusted=false,active=false)"), + dumpBucket(document::BucketId(16, 1))); + CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), + dumpBucket(document::BucketId(16, 2))); + + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + + std::vector<api::BucketInfo> infos; + infos.push_back(api::BucketInfo(4567, 200, 2000, 400, 4000, true, true)); + infos.push_back(api::BucketInfo(8999, 300, 3000, 500, 5000, false, false)); + + for (int i = 0; i < 2; ++i) { + api::RequestBucketInfoCommand& rbi( + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i])); + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, i + 1), rbi.getBuckets()[0]); + + auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(document::BucketId(16, i + 1), + infos[i])); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + } + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true)"), + dumpBucket(document::BucketId(16, 1))); + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false)"), + dumpBucket(document::BucketId(16, 2))); + +} + +void +BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown() +{ + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:2")); + + addNodesToBucketDB(document::BucketId(16, 1), "1=1234"); + + _sender.replies.clear(); + + { + api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); + auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( + document::BucketId(16, 1), info)); + cmd->setSourceIndex(0); + getBucketDBUpdater().onNotifyBucketChange(cmd); + } + // Enable here to avoid having request bucket info be silently swallowed + // (sendRequestBucketInfo drops message if node is down). + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:2 .0.s:d")); + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false)"), + dumpBucket(document::BucketId(16, 1))); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL(MessageType::NOTIFYBUCKETCHANGE_REPLY, + _sender.replies[0]->getType()); + + // Currently, this pending operation will be auto-flushed when the cluster state + // changes so the behavior is still correct. Keep this test around to prevent + // regressions here. + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + api::RequestBucketInfoCommand& rbi( + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]); + + auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + document::BucketId(16, 1), + api::BucketInfo(8999, 300, 3000, 500, 5000, false, false))); + getBucketDBUpdater().onRequestBucketInfoReply(reply); + + // No change + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false)"), + dumpBucket(document::BucketId(16, 1))); +} + +/** + * Test that NotifyBucketChange received while there's a pending cluster state + * waits until the cluster state has been enabled as current before it sends off + * the single bucket info requests. This is to prevent a race condition where + * the replies to bucket info requests for buckets that would be owned by the + * distributor in the pending state but not by the current state would be + * discarded when attempted inserted into the bucket database. + */ +void +BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() +{ + setSystemState(lib::ClusterState("distributor:1 storage:1")); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + { + api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); + auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( + document::BucketId(16, 1), info)); + cmd->setSourceIndex(0); + getBucketDBUpdater().onNotifyBucketChange(cmd); + } + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + fakeBucketReply( + lib::ClusterState("distributor:1 storage:1"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]), + 0, 10); + + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + + { + api::RequestBucketInfoCommand& rbi( + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[1])); + CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]); + } + _sender.clear(); + + // Queue must be cleared once pending state is enabled. + { + lib::ClusterState state("distributor:1 storage:2"); + uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + { + api::RequestBucketInfoCommand& rbi( + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); + CPPUNIT_ASSERT_EQUAL(size_t(0), rbi.getBuckets().size()); + } +} + +void +BucketDBUpdaterTest::testMergeReply() +{ + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + + addNodesToBucketDB(document::BucketId(16, 1234), + "0=1234,1=1234,2=1234"); + + std::vector<api::MergeBucketCommand::Node> nodes; + nodes.push_back(api::MergeBucketCommand::Node(0)); + nodes.push_back(api::MergeBucketCommand::Node(1)); + nodes.push_back(api::MergeBucketCommand::Node(2)); + + api::MergeBucketCommand cmd(document::BucketId(16, 1234), nodes, 0); + + auto reply(std::make_shared<api::MergeBucketReply>(cmd)); + + _sender.clear(); + getBucketDBUpdater().onMergeBucketReply(reply); + + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + + for (uint32_t i = 0; i < 3; i++) { + std::shared_ptr<api::RequestBucketInfoCommand> + req(std::dynamic_pointer_cast<api::RequestBucketInfoCommand>( + _sender.commands[i])); + + CPPUNIT_ASSERT(req.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]); + + auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req)); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(document::BucketId(16, 1234), + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + + getBucketDBUpdater().onRequestBucketInfoReply(reqreply); + } + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false), " + "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false)"), + dumpBucket(document::BucketId(16, 1234))); +}; + +void +BucketDBUpdaterTest::testMergeReplyNodeDown() +{ + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + std::vector<api::MergeBucketCommand::Node> nodes; + + addNodesToBucketDB(document::BucketId(16, 1234), "0=1234,1=1234,2=1234"); + + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(document::BucketId(16, 1234), nodes, 0); + + auto reply(std::make_shared<api::MergeBucketReply>(cmd)); + + setSystemState(lib::ClusterState("distributor:1 storage:2")); + + _sender.clear(); + getBucketDBUpdater().onMergeBucketReply(reply); + + CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + + for (uint32_t i = 0; i < 2; i++) { + std::shared_ptr<api::RequestBucketInfoCommand> req( + std::dynamic_pointer_cast<api::RequestBucketInfoCommand>( + _sender.commands[i])); + + CPPUNIT_ASSERT(req.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]); + + auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req)); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + document::BucketId(16, 1234), + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + getBucketDBUpdater().onRequestBucketInfoReply(reqreply); + } + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false)"), + dumpBucket(document::BucketId(16, 1234))); +}; + +void +BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent() +{ + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + std::vector<api::MergeBucketCommand::Node> nodes; + + addNodesToBucketDB(document::BucketId(16, 1234), "0=1234,1=1234,2=1234"); + + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(document::BucketId(16, 1234), nodes, 0); + + auto reply(std::make_shared<api::MergeBucketReply>(cmd)); + + _sender.clear(); + getBucketDBUpdater().onMergeBucketReply(reply); + + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + + setSystemState(lib::ClusterState("distributor:1 storage:2")); + + for (uint32_t i = 0; i < 3; i++) { + std::shared_ptr<api::RequestBucketInfoCommand> req( + std::dynamic_pointer_cast<api::RequestBucketInfoCommand>( + _sender.commands[i])); + + CPPUNIT_ASSERT(req.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]); + + auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req)); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + document::BucketId(16, 1234), + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + getBucketDBUpdater().onRequestBucketInfoReply(reqreply); + } + + CPPUNIT_ASSERT_EQUAL( + std::string("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false)"), + dumpBucket(document::BucketId(16, 1234))); +}; + + +void +BucketDBUpdaterTest::testFlush() +{ + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + _sender.clear(); + + addNodesToBucketDB(document::BucketId(16, 1234), "0=1234,1=1234,2=1234"); + + std::vector<api::MergeBucketCommand::Node> nodes; + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(document::BucketId(16, 1234), + nodes, + 0); + + auto reply(std::make_shared<api::MergeBucketReply>(cmd)); + + _sender.clear(); + getBucketDBUpdater().onMergeBucketReply(reply); + + CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.replies.size()); + + getBucketDBUpdater().flush(); + // Flushing should drop all merge bucket replies + CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); +} + +std::string +BucketDBUpdaterTest::getSentNodes( + const std::string& oldClusterState, + const std::string& newClusterState) +{ + MessageSenderStub sender; + + std::shared_ptr<api::SetSystemStateCommand> cmd( + new api::SetSystemStateCommand( + lib::ClusterState(newClusterState))); + + framework::defaultimplementation::FakeClock clock; + ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); + + std::unordered_set<uint16_t> outdatedNodes; + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForClusterStateChange( + clock, clusterInfo, sender, cmd, outdatedNodes, + api::Timestamp(1))); + + sortSentMessagesByIndex(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands.size(); i++) { + RequestBucketInfoCommand* req = + dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get()); + + if (i > 0) { + ost << ","; + } + + ost << req->getAddress()->getIndex(); + } + + return ost.str(); +} + +std::string +BucketDBUpdaterTest::getSentNodesDistributionChanged( + const std::string& oldClusterState) +{ + MessageSenderStub sender; + + framework::defaultimplementation::FakeClock clock; + ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForDistributionChange( + clock, clusterInfo, sender, api::Timestamp(1))); + + sortSentMessagesByIndex(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands.size(); i++) { + RequestBucketInfoCommand* req = + dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get()); + + if (i > 0) { + ost << ","; + } + + ost << req->getAddress()->getIndex(); + } + + return ost.str(); +} + +void +BucketDBUpdaterTest::testPendingClusterStateSendMessages() +{ + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2"), + getSentNodes("cluster:d", + "distributor:1 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1"), + getSentNodes("cluster:d", + "distributor:1 storage:3 .2.s:m")); + + CPPUNIT_ASSERT_EQUAL( + std::string("2"), + getSentNodes("distributor:1 storage:2", + "distributor:1 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("2,3,4,5"), + getSentNodes("distributor:1 storage:2", + "distributor:1 storage:6")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2"), + getSentNodes("distributor:4 storage:3", + "distributor:3 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2,3"), + getSentNodes("distributor:4 storage:3", + "distributor:4 .2.s:d storage:4")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:4 storage:3", + "distributor:4 .0.s:d storage:4")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:3 storage:3", + "distributor:4 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("2"), + getSentNodes("distributor:3 storage:3 .2.s:i", + "distributor:3 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("1"), + getSentNodes("distributor:3 storage:3 .1.s:d", + "distributor:3 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("1,2,4"), + getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i", + "distributor:3 storage:5")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:1 storage:3", + "cluster:d")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:1 storage:3", + "distributor:1 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:1 storage:3", + "cluster:d distributor:1 storage:6")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:3 storage:3", + "distributor:3 .2.s:m storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2"), + getSentNodes("distributor:3 .2.s:m storage:3", + "distributor:3 .2.s:d storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:3 .2.s:m storage:3", + "distributor:3 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2"), + getSentNodesDistributionChanged("distributor:3 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1"), + getSentNodes("distributor:10 storage:2", + "distributor:10 .1.s:d storage:2")); + + CPPUNIT_ASSERT_EQUAL( + std::string("1"), + getSentNodes("distributor:2 storage:2", + "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); + + CPPUNIT_ASSERT_EQUAL( + std::string("1"), + getSentNodes("distributor:2 storage:2 .1.s:d", + "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:2 storage:2", + "distributor:3 .2.s:i storage:2")); + + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2"), + getSentNodes("distributor:3 storage:3", + "distributor:3 .2.s:s storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:3 .2.s:s storage:3", + "distributor:3 .2.s:d storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string("1"), + getSentNodes("distributor:3 storage:3 .1.s:m", + "distributor:3 storage:3")); + + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:3 storage:3", + "distributor:3 storage:3 .1.s:m")); +}; + +void +BucketDBUpdaterTest::testPendingClusterStateReceive() +{ + MessageSenderStub sender; + + auto cmd(std::make_shared<api::SetSystemStateCommand>( + lib::ClusterState("distributor:1 storage:3"))); + + framework::defaultimplementation::FakeClock clock; + ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); + std::unordered_set<uint16_t> outdatedNodes; + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForClusterStateChange( + clock, clusterInfo, sender, cmd, outdatedNodes, + api::Timestamp(1))); + + CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); + + sortSentMessagesByIndex(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands.size(); i++) { + RequestBucketInfoCommand* req = + dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get()); + + RequestBucketInfoReply* rep = + new RequestBucketInfoReply(*req); + + rep->getBucketInfo().push_back( + RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(i, i, i, i, i))); + + CPPUNIT_ASSERT( + state->onRequestBucketInfoReply( + std::shared_ptr<api::RequestBucketInfoReply>(rep))); + + CPPUNIT_ASSERT_EQUAL(i == sender.commands.size() - 1 ? true : false, + state->done()); + } + + CPPUNIT_ASSERT_EQUAL(3, (int)state->results().size()); +} + +void +BucketDBUpdaterTest::testPendingClusterStateWithGroupDown() +{ + std::string config(getDistConfig6Nodes4Groups()); + config += "distributor_auto_ownership_transfer_on_whole_group_down true\n"; + setDistribution(config); + + // Group config has nodes {0, 1}, {2, 3}, {4, 5} + // We're node index 0. + + // Entire group 1 goes down. Must refetch from all nodes. + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2,3,4,5"), + getSentNodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); + + // But don't fetch if not the entire group is down. + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:6 storage:6", + "distributor:6 .2.s:d storage:6")); +} + +void +BucketDBUpdaterTest::testPendingClusterStateWithGroupDownAndNoHandover() +{ + std::string config(getDistConfig6Nodes4Groups()); + config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; + setDistribution(config); + + // Group is down, but config says to not do anything about it. + CPPUNIT_ASSERT_EQUAL( + std::string(""), + getSentNodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); +} + +void +parseInputData(const std::string& data, + uint64_t timestamp, + PendingClusterState& state, + bool includeBucketInfo) +{ + vespalib::StringTokenizer tokenizer(data, "|"); + for (uint32_t i = 0; i < tokenizer.size(); i++) { + vespalib::StringTokenizer tok2(tokenizer[i], ":"); + + uint16_t node = atoi(tok2[0].c_str()); + + state.setNodeReplied(node); + + vespalib::StringTokenizer tok3(tok2[1], ","); + for (uint32_t j = 0; j < tok3.size(); j++) { + if (includeBucketInfo) { + vespalib::StringTokenizer tok4(tok3[j], "/"); + + state.addNodeInfo( + document::BucketId(16, atoi(tok4[0].c_str())), + BucketCopy( + timestamp, + node, + api::BucketInfo( + atoi(tok4[1].c_str()), + atoi(tok4[2].c_str()), + atoi(tok4[3].c_str()), + atoi(tok4[2].c_str()), + atoi(tok4[3].c_str())))); + } else { + state.addNodeInfo( + document::BucketId(16, atoi(tok3[j].c_str())), + BucketCopy(timestamp, + node, + api::BucketInfo(3, 3, 3, 3, 3))); + } + } + } +} + +struct BucketDumper : public BucketDatabase::EntryProcessor +{ + std::ostringstream ost; + bool _includeBucketInfo; + + BucketDumper(bool includeBucketInfo) + : _includeBucketInfo(includeBucketInfo) + { + } + + bool process(const BucketDatabase::Entry& e) { + document::BucketId bucketId(e.getBucketId()); + + ost << (uint32_t)bucketId.getRawId() << ":"; + for (uint32_t i = 0; i < e->getNodeCount(); ++i) { + if (i > 0) { + ost << ","; + } + const BucketCopy& copy(e->getNodeRef(i)); + ost << copy.getNode(); + if (_includeBucketInfo) { + ost << "/" << copy.getChecksum() + << "/" << copy.getDocumentCount() + << "/" << copy.getTotalDocumentSize() + << "/" << (copy.trusted() ? "t" : "u"); + } + } + ost << "|"; + return true; + } +}; + +std::string +BucketDBUpdaterTest::mergeBucketLists( + const lib::ClusterState& oldState, + const std::string& existingData, + const lib::ClusterState& newState, + const std::string& newData, + bool includeBucketInfo) +{ + framework::defaultimplementation::FakeClock clock; + framework::MilliSecTimer timer(clock); + + MessageSenderStub sender; + std::unordered_set<uint16_t> outdatedNodes; + + { + auto cmd(std::make_shared<api::SetSystemStateCommand>(oldState)); + + api::Timestamp beforeTime(1); + + ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForClusterStateChange( + clock, clusterInfo, sender, cmd, outdatedNodes, + beforeTime)); + + parseInputData(existingData, beforeTime, *state, includeBucketInfo); + state->mergeInto(getBucketDBUpdater().getDistributorComponent().getBucketDatabase()); + } + + BucketDumper dumper_tmp(true); + getBucketDatabase().forEach(dumper_tmp); + + { + auto cmd(std::make_shared<api::SetSystemStateCommand>( + lib::ClusterState(newState))); + + api::Timestamp afterTime(2); + + ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForClusterStateChange( + clock, clusterInfo, sender, cmd, outdatedNodes, + afterTime)); + + parseInputData(newData, afterTime, *state, includeBucketInfo); + state->mergeInto(getBucketDBUpdater().getDistributorComponent() + .getBucketDatabase()); + } + + BucketDumper dumper(includeBucketInfo); + getBucketDBUpdater().getDistributorComponent() + .getBucketDatabase().forEach(dumper); + getBucketDBUpdater().getDistributorComponent() + .getBucketDatabase().clear(); + return dumper.ost.str(); +} + +std::string +BucketDBUpdaterTest::mergeBucketLists(const std::string& existingData, + const std::string& newData, + bool includeBucketInfo) +{ + return mergeBucketLists( + lib::ClusterState("distributor:1 storage:3"), + existingData, + lib::ClusterState("distributor:1 storage:3"), + newData, + includeBucketInfo); +} + +void +BucketDBUpdaterTest::testPendingClusterStateMerge() +{ + // Simple initializing case - ask all nodes for info + CPPUNIT_ASSERT_EQUAL( + // Result is on the form: [bucket w/o count bits]:[node indexes]|.. + std::string("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|"), + // Input is on the form: [node]:[bucket w/o count bits]|... + mergeBucketLists( + "", + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6")); + + // Node came up with fewer buckets (lost disk) + CPPUNIT_ASSERT_EQUAL( + std::string("4:1|2:0,1|6:1,2|1:2,0|5:2|3:2,1|"), + mergeBucketLists( + lib::ClusterState("distributor:1 storage:3"), + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + lib::ClusterState("distributor:1 storage:3 .0.d:3 .0.d.1.s:d"), + "0:1,2") + ); + + // New node came up + CPPUNIT_ASSERT_EQUAL( + std::string("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|"), + mergeBucketLists( + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + "3:1,3,5,6")); + + // Node came up with some buckets removed and some added + // Buckets that were removed should not be removed as the node + // didn't lose a disk. + CPPUNIT_ASSERT_EQUAL( + std::string("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|"), + mergeBucketLists( + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + "0:1,2,6,8")); + + // Node came up with no buckets + CPPUNIT_ASSERT_EQUAL( + std::string("4:1|2:1|6:1,2|1:2|5:2|3:2,1|"), + mergeBucketLists( + lib::ClusterState("distributor:1 storage:3"), + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + lib::ClusterState("distributor:1 storage:3 .0.d:3 .0.d.1.s:d"), + "0:") + ); + + // One node lost a disk, another was just reasked (distributor + // change) + CPPUNIT_ASSERT_EQUAL( + std::string("2:0,1|6:1,2|1:2,0|5:2|3:2,1|"), + mergeBucketLists( + lib::ClusterState("distributor:1 storage:3"), + "0:1,2,4,5|1:2,3,6|2:1,3,5,6", + lib::ClusterState("distributor:1 storage:3 .0.d:3 .0.d.1.s:d"), + "0:1,2|1:2,3") + ); + + // Bucket info format is "bucketid/checksum/count/size" + // Node went from initializing to up and invalid bucket went to empty. + CPPUNIT_ASSERT_EQUAL( + std::string("2:0/0/0/0/t|"), + mergeBucketLists( + "0:2/0/0/1", + "0:2/0/0/0", + true)); + + CPPUNIT_ASSERT_EQUAL( + std::string("5:1/2/3/4/u,0/0/0/0/u|"), + mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true)); +} + +void +BucketDBUpdaterTest::testPendingClusterStateMergeReplicaChanged() +{ + // Node went from initializing to up and non-invalid bucket changed. + CPPUNIT_ASSERT_EQUAL( + std::string("2:0/2/3/4/t|3:0/2/4/6/t|"), + mergeBucketLists( + lib::ClusterState("distributor:1 storage:1 .0.s:i"), + "0:2/1/2/3,3/2/4/6", + lib::ClusterState("distributor:1 storage:1"), + "0:2/2/3/4,3/2/4/6", + true)); +} + +void +BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() +{ + document::BucketId bucket(16, 3); + lib::ClusterState stateBefore("distributor:1 storage:1"); + { + uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + } + _sender.clear(); + + getBucketDBUpdater().recheckBucketInfo(0, bucket); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + std::shared_ptr<api::RequestBucketInfoCommand> rbi( + std::dynamic_pointer_cast<RequestBucketInfoCommand>( + _sender.commands[0])); + + lib::ClusterState stateAfter("distributor:3 storage:3"); + + { + uint32_t expectedMsgs = 2, dummyBucketsToReturn = 1; + setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn); + } + CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent() + .ownsBucketInCurrentState(bucket)); + + sendFakeReplyForSingleBucketRequest(*rbi); + + CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), dumpBucket(bucket)); +} + +void +BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() +{ + document::BucketId bucket(16, 3); + lib::ClusterState stateBefore("distributor:1 storage:1"); + { + uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + } + _sender.clear(); + + getBucketDBUpdater().recheckBucketInfo(0, bucket); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + std::shared_ptr<api::RequestBucketInfoCommand> rbi( + std::dynamic_pointer_cast<RequestBucketInfoCommand>( + _sender.commands[0])); + + lib::ClusterState stateAfter("distributor:3 storage:3"); + // Set, but _don't_ enable cluster state. We want it to be pending. + setSystemState(stateAfter); + CPPUNIT_ASSERT(getBucketDBUpdater().getDistributorComponent() + .ownsBucketInCurrentState(bucket)); + CPPUNIT_ASSERT(!getBucketDBUpdater() + .checkOwnershipInPendingState(bucket).isOwned()); + + sendFakeReplyForSingleBucketRequest(*rbi); + + CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), dumpBucket(bucket)); +} + +/* + * If we get a distribution config change, it's important that cluster states that + * arrive after this--but _before_ the pending cluster state has finished--must trigger + * a full bucket info fetch no matter what the cluster state change was! Otherwise, we + * will with a high likelihood end up not getting the complete view of the buckets in + * the cluster. + */ +void +BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending() +{ + lib::ClusterState stateBefore("distributor:6 storage:6"); + { + uint32_t expectedMsgs = 6, dummyBucketsToReturn = 1; + setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + } + _sender.clear(); + std::string distConfig(getDistConfig6Nodes3Groups()); + { + _node->getComponentRegister().setDistribution( + std::make_shared<lib::Distribution>(distConfig)); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); + } + sortSentMessagesByIndex(_sender); + CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + // Suddenly, a wild cluster state change appears! Even though this state + // does not in itself imply any bucket changes, it will still overwrite the + // pending cluster state and thus its state of pending bucket info requests. + setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6")); + + CPPUNIT_ASSERT_EQUAL(size_t(12), _sender.commands.size()); + + // Send replies for first 6 (outdated requests). + int numBuckets = 10; + for (uint32_t i = 0; i < 6; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:6 storage:6"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + i, numBuckets); + } + // No change from these. + assertCorrectBuckets(1, "distributor:6 storage:6"); + + // Send for current pending. + for (uint32_t i = 0; i < 6; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:6 .2.t:12345 storage:6"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i + 6]), + i, numBuckets); + } + assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); + _sender.clear(); + + // No more pending global fetch; this should be a no-op state. + setSystemState(lib::ClusterState("distributor:6 .3.t:12345 storage:6")); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); +} + +void +BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() +{ + setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), 6, 20); + _sender.clear(); + // First cluster state; implicit scan of all buckets which does not + // use normal recovery mode ticking-path. + CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); + + std::string distConfig(getDistConfig6Nodes4Groups()); + _node->getComponentRegister().setDistribution( + std::make_shared<lib::Distribution>(distConfig)); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); + sortSentMessagesByIndex(_sender); + // No replies received yet, still no recovery mode. + CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); + + CPPUNIT_ASSERT_EQUAL(size_t(6), _sender.commands.size()); + uint32_t numBuckets = 10; + for (uint32_t i = 0; i < 6; ++i) { + fakeBucketReply( + lib::ClusterState("distributor:6 storage:6"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]), + i, numBuckets); + } + + // Pending cluster state (i.e. distribution) has been enabled, which should + // cause recovery mode to be entered. + CPPUNIT_ASSERT(_distributor->isInRecoveryMode()); +} + +void +BucketDBUpdaterTest::testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp() +{ + getClock().setAbsoluteTimeInSeconds(101234); + lib::ClusterState stateBefore("distributor:1 storage:1"); + { + uint32_t expectedMsgs = 1, dummyBucketsToReturn = 1; + setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + } + + // setAndEnableClusterState adds n buckets with id (16, i) + document::BucketId bucket(16, 0); + BucketDatabase::Entry e(getBucket(bucket)); + CPPUNIT_ASSERT(e.valid()); + CPPUNIT_ASSERT_EQUAL(uint32_t(101234), e->getLastGarbageCollectionTime()); +} + +void +BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() +{ + { + lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i"); + uint32_t expectedMsgs = 1, dummyBucketsToReturn = 0; + // This step is required to make the distributor ready for accepting + // the below explicit database insertion towards node 0. + setAndEnableClusterState(stateBefore, expectedMsgs, + dummyBucketsToReturn); + } + _sender.clear(); + getClock().setAbsoluteTimeInSeconds(1000); + lib::ClusterState state("distributor:1 storage:1"); + setSystemState(state); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + + // Before replying with the bucket info, simulate the arrival of a mutation + // reply that alters the state of the bucket with information that will be + // more recent that what is returned by the bucket info. This information + // must not be lost when the bucket info is later merged into the database. + document::BucketId bucket(16, 1); + constexpr uint64_t insertionTimestamp = 1001ULL * 1000000; + api::BucketInfo wantedInfo(5, 6, 7); + getBucketDBUpdater().getDistributorComponent().updateBucketDatabase( + bucket, + BucketCopy(insertionTimestamp, 0, wantedInfo), + DatabaseUpdate::CREATE_IF_NONEXISTING); + + getClock().setAbsoluteTimeInSeconds(1002); + constexpr uint32_t bucketsReturned = 10; // Buckets (16, 0) ... (16, 9) + // Return bucket information which on the timeline might originate from + // anywhere between [1000, 1002]. Our assumption is that any mutations + // taking place after t=1000 must have its reply received and processed + // by this distributor and timestamped strictly higher than t=1000 (modulo + // clock skew, of course, but that is outside the scope of this). A mutation + // happening before t=1000 but receiving a reply at t>1000 does not affect + // correctness, as this should contain the same bucket info as that + // contained in the full bucket reply and the DB update is thus idempotent. + fakeBucketReply( + state, + dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]), + 0, + bucketsReturned); + + BucketDatabase::Entry e(getBucket(bucket)); + CPPUNIT_ASSERT_EQUAL(uint32_t(1), e->getNodeCount()); + CPPUNIT_ASSERT_EQUAL(wantedInfo, e->getNodeRef(0).getBucketInfo()); + +} + +std::vector<uint16_t> +BucketDBUpdaterTest::getSendSet() const +{ + std::vector<uint16_t> nodes; + std::transform(_sender.commands.begin(), + _sender.commands.end(), + std::back_inserter(nodes), + [](auto& cmd) + { + auto& req(dynamic_cast<const api::RequestBucketInfoCommand&>(*cmd)); + return req.getAddress()->getIndex(); + }); + return nodes; +} + +std::vector<uint16_t> +BucketDBUpdaterTest::getSentNodesWithPreemption( + const std::string& oldClusterState, + uint32_t expectedOldStateMessages, + const std::string& preemptedClusterState, + const std::string& newClusterState) +{ + lib::ClusterState stateBefore(oldClusterState); + uint32_t dummyBucketsToReturn = 10; + setAndEnableClusterState(lib::ClusterState(oldClusterState), + expectedOldStateMessages, + dummyBucketsToReturn); + _sender.clear(); + + setSystemState(lib::ClusterState(preemptedClusterState)); + _sender.clear(); + // Do not allow the pending state to become the active state; trigger a + // new transition without ACKing the info requests first. This will + // overwrite the pending state entirely. + setSystemState(lib::ClusterState(newClusterState)); + return getSendSet(); +} + +using nodeVec = std::vector<uint16_t>; + +/* + * If we don't carry over the set of nodes that we need to fetch from, + * a naive comparison between the active state and the new state will + * make it appear to the distributor that nothing has changed, as any + * database modifications caused by intermediate states will not be + * accounted for (basically the ABA problem in a distributed setting). + */ +void +BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch() +{ + CPPUNIT_ASSERT_EQUAL( + (nodeVec{0, 1, 2, 3, 4, 5}), + getSentNodesWithPreemption("version:1 distributor:6 storage:6", 6, + "version:2 distributor:6 .5.s:d storage:6", + "version:3 distributor:6 storage:6")); +} + +void +BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch() +{ + CPPUNIT_ASSERT_EQUAL( + (nodeVec{2, 3}), + getSentNodesWithPreemption( + "version:1 distributor:6 storage:6 .2.s:d", 5, + "version:2 distributor:6 storage:6 .2.s:d .3.s:d", + "version:3 distributor:6 storage:6")); +} + +void +BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched() +{ + CPPUNIT_ASSERT_EQUAL( + (nodeVec{2}), + getSentNodesWithPreemption( + "version:1 distributor:6 storage:6", 6, + "version:2 distributor:6 storage:6 .2.s:d", + "version:3 distributor:6 storage:6")); +} + +void +BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState() +{ + CPPUNIT_ASSERT_EQUAL( + nodeVec{}, + getSentNodesWithPreemption( + "version:1 distributor:6 storage:6 .2.s:d", 5, + "version:2 distributor:6 storage:6", // Sends to 2. + "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. +} + +void +BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState() +{ + // Even though 100 nodes are preempted, not all of these should be part + // of the request afterwards when only 6 are part of the state. + CPPUNIT_ASSERT_EQUAL( + (nodeVec{0, 1, 2, 3, 4, 5}), + getSentNodesWithPreemption( + "version:1 distributor:6 storage:100", 100, + "version:2 distributor:5 .4.s:d storage:100", + "version:3 distributor:6 storage:6")); +} + +void +BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion() +{ + lib::ClusterState stateBefore( + "version:1 distributor:6 storage:6 .1.t:1234"); + uint32_t expectedMsgs = 6, dummyBucketsToReturn = 10; + setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + _sender.clear(); + // New cluster state that should not by itself trigger any new fetches, + // unless outdated node set is somehow not cleared after an enabled + // (completed) cluster state has been set. + lib::ClusterState stateAfter("version:3 distributor:6 storage:6"); + setSystemState(stateAfter); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); +} + +// XXX test currently disabled since distribution config currently isn't used +// at all in order to deduce the set of nodes to send to. This might not matter +// in practice since it is assumed that the cluster state matching the new +// distribution config will follow very shortly after the config has been +// applied to the node. The new cluster state will then send out requests to +// the correct node set. +void +BucketDBUpdaterTest::clusterConfigDownsizeOnlySendsToAvailableNodes() +{ + uint32_t expectedMsgs = 6, dummyBucketsToReturn = 20; + setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), + expectedMsgs, dummyBucketsToReturn); + _sender.clear(); + + // Intentionally trigger a racing config change which arrives before the + // new cluster state representing it. + std::string distConfig(getDistConfig3Nodes1Group()); + _node->getComponentRegister().setDistribution( + std::make_shared<lib::Distribution>(distConfig)); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); + sortSentMessagesByIndex(_sender); + + CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1, 2}), getSendSet()); +} + +void +BucketDBUpdaterTest::changedDiskSetTriggersReFetch() +{ + // Same number of online disks, but the set of disks has changed. + CPPUNIT_ASSERT_EQUAL( + std::string("1"), + getSentNodes("distributor:2 storage:2 .1.d:3 .1.d.2.s:d", + "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); +} + +/** + * Test scenario where a cluster is downsized by removing a subset of the nodes + * from the distribution configuration. The system must be able to deal with + * a scenario where the set of nodes between two cluster states across a config + * change may differ. + * + * See VESPA-790 for details. + */ +void +BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() +{ + uint32_t expectedMsgs = 3, dummyBucketsToReturn = 1; + setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), + expectedMsgs, dummyBucketsToReturn); + _sender.clear(); + + // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config + // that does not contain node 2 while the _active_ cluster state still + // contains this node. + const char* downsizeCfg = + "redundancy 2\n" + "distributor_auto_ownership_transfer_on_whole_group_down true\n" + "group[2]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[2]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n"; + + _node->getComponentRegister().setDistribution( + std::make_shared<lib::Distribution>(downsizeCfg)); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); + sortSentMessagesByIndex(_sender); + _sender.clear(); + + // Attempt to apply state with {0, 1} set. This will compare the new state + // with the previous state, which still has node 2. + expectedMsgs = 2; + setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), + expectedMsgs, dummyBucketsToReturn); + + CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1}), getSendSet()); +} + +} // distributor +} // storage |