diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2017-11-09 16:51:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-09 16:51:38 +0100 |
commit | 310675a44a01e9f671b06d63a2043cdad2cfc58a (patch) | |
tree | 2ee82fc73ebb1f8fbe358a604adde4dfeeb5fe12 /storage | |
parent | 361e3e62b28f9193b48b88d6c4fd3c89facf9862 (diff) | |
parent | ee7a78f7cded5cdddfcc76f5651126f01d36fa9f (diff) |
Merge pull request #4049 from vespa-engine/toregge/change-pending-cluster-state-to-handle-multiple-bucket-spaces
Change pending cluster state to handle multiple bucket spaces
Diffstat (limited to 'storage')
7 files changed, 111 insertions, 65 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index d1a54c04359..363065be65c 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -4,11 +4,13 @@ #include <iomanip> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> +#include <vespa/storage/distributor/pending_bucket_space_db_transition.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/storageutil/distributorstatecache.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/simpleclusterinformation.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/vespalib/text/stringtokenizer.h> @@ -16,6 +18,7 @@ using namespace storage::api; using namespace storage::lib; using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; namespace storage { namespace distributor { @@ -538,7 +541,7 @@ public: std::unordered_set<uint16_t> outdatedNodes; state = PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodes, api::Timestamp(1)); } @@ -551,7 +554,7 @@ public: std::unordered_set<uint16_t> outdatedNodes; state = PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, api::Timestamp(1)); + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); } }; @@ -1475,7 +1478,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged( ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, api::Timestamp(1))); + clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1))); sortSentMessagesByIndex(sender); @@ -1640,7 +1643,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() std::unordered_set<uint16_t> outdatedNodes; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, api::Timestamp(1))); CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); @@ -1668,7 +1671,8 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() state->done()); } - CPPUNIT_ASSERT_EQUAL(3, (int)state->results().size()); + auto &pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); + CPPUNIT_ASSERT_EQUAL(3, (int)pendingTransition.results().size()); } void @@ -1721,13 +1725,14 @@ parseInputData(const std::string& data, uint16_t node = atoi(tok2[0].c_str()); state.setNodeReplied(node); + auto &pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); vespalib::StringTokenizer tok3(tok2[1], ","); for (uint32_t j = 0; j < tok3.size(); j++) { if (includeBucketInfo) { vespalib::StringTokenizer tok4(tok3[j], "/"); - state.addNodeInfo( + pendingTransition.addNodeInfo( document::BucketId(16, atoi(tok4[0].c_str())), BucketCopy( timestamp, @@ -1739,7 +1744,7 @@ parseInputData(const std::string& data, atoi(tok4[2].c_str()), atoi(tok4[3].c_str())))); } else { - state.addNodeInfo( + pendingTransition.addNodeInfo( document::BucketId(16, atoi(tok3[j].c_str())), BucketCopy(timestamp, node, @@ -1803,11 +1808,11 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); - state->mergeInto(getBucketDBUpdater().getDistributorComponent().getBucketDatabase()); + state->mergeIntoBucketDatabases(); } BucketDumper dumper_tmp(true); @@ -1822,12 +1827,11 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); - state->mergeInto(getBucketDBUpdater().getDistributorComponent() - .getBucketDatabase()); + state->mergeIntoBucketDatabases(); } BucketDumper dumper(includeBucketInfo); diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 5deb31f8579..a0aa8e00070 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -358,6 +358,16 @@ DistributorTestUtil::getBucketDatabase() const { return _distributor->getDefaultBucketSpace().getBucketDatabase(); } +DistributorBucketSpaceRepo & +DistributorTestUtil::getBucketSpaceRepo() { + return _distributor->getBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo & +DistributorTestUtil::getBucketSpaceRepo() const { + return _distributor->getBucketSpaceRepo(); +} + const lib::Distribution& DistributorTestUtil::getDistribution() const { return _distributor->getDefaultBucketSpace().getDistribution(); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 4f09c11ac03..19da0483165 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -20,6 +20,7 @@ namespace distributor { class BucketDBUpdater; class Distributor; class DistributorBucketSpace; +class DistributorBucketSpaceRepo; class IdealStateManager; class ExternalOperationHandler; class Operation; @@ -125,6 +126,8 @@ public: DistributorBucketSpace &getDistributorBucketSpace(); BucketDatabase& getBucketDatabase(); const BucketDatabase& getBucketDatabase() const; + DistributorBucketSpaceRepo &getBucketSpaceRepo(); + const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 569136b8b10..50431bda37e 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -169,6 +169,7 @@ BucketDBUpdater::storageDistributionChanged( _bucketSpaceComponent.getClock(), std::move(clusterInfo), _sender, + _bucketSpaceComponent.getBucketSpaceRepo(), _bucketSpaceComponent.getUniqueTimestamp()); _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); } @@ -216,6 +217,7 @@ BucketDBUpdater::onSetSystemState( _bucketSpaceComponent.getClock(), std::move(clusterInfo), _sender, + _bucketSpaceComponent.getBucketSpaceRepo(), cmd, _outdatedNodes, _bucketSpaceComponent.getUniqueTimestamp()); @@ -486,7 +488,7 @@ BucketDBUpdater::isPendingClusterStateCompleted() const void BucketDBUpdater::processCompletedPendingClusterState() { - _pendingClusterState->mergeInto(_bucketSpaceComponent.getBucketDatabase()); + _pendingClusterState->mergeIntoBucketDatabases(); if (_pendingClusterState->getCommand().get()) { enableCurrentClusterStateInDistributor(); diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index f59b47574ba..3cb3408a951 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -161,6 +161,8 @@ public: DistributorBucketSpace& getDefaultBucketSpace() noexcept; const DistributorBucketSpace& getDefaultBucketSpace() const noexcept; + DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; } + const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; } private: friend class Distributor_Test; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 1186cc23c5f..e8f9442c76f 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -3,6 +3,7 @@ #include "pendingclusterstate.h" #include "pending_bucket_space_db_transition.h" #include "bucketdbupdater.h" +#include "distributor_bucket_space_repo.h" #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vespalib/util/xmlstream.hpp> @@ -23,6 +24,7 @@ PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const std::unordered_set<uint16_t>& outdatedNodes, api::Timestamp creationTimestamp) @@ -35,8 +37,9 @@ PendingClusterState::PendingClusterState( _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), + _bucketSpaceRepo(bucketSpaceRepo), _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)), - _pendingTransition() + _pendingTransitions() { logConstructionInformation(); if (hasBucketOwnershipTransfer()) { @@ -45,16 +48,14 @@ PendingClusterState::PendingClusterState( updateSetOfNodesThatAreOutdated(); addAdditionalNodesToOutdatedSet(outdatedNodes); } - _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); - if (shouldRequestBucketInfo()) { - requestNodes(); - } + initializeBucketSpaceTransitions(); } PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), _outdatedNodes(clusterInfo->getStorageNodeCount()), @@ -64,19 +65,28 @@ PendingClusterState::PendingClusterState( _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), + _bucketSpaceRepo(bucketSpaceRepo), _bucketOwnershipTransfer(true), - _pendingTransition() + _pendingTransitions() { logConstructionInformation(); markAllAvailableNodesAsRequiringRequest(); - _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); + initializeBucketSpaceTransitions(); +} + +PendingClusterState::~PendingClusterState() {} + +void +PendingClusterState::initializeBucketSpaceTransitions() +{ + for (auto &elem : _bucketSpaceRepo) { + _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp)); + } if (shouldRequestBucketInfo()) { requestNodes(); } } -PendingClusterState::~PendingClusterState() {} - void PendingClusterState::logConstructionInformation() const { @@ -217,7 +227,9 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() { for (uint16_t idx : _outdatedNodes) { if (storageNodeUpInNewState(idx)) { - requestNode(idx); + for (auto &elem : _bucketSpaceRepo) { + requestNode(BucketSpaceAndNode(elem.first, idx)); + } } } } @@ -312,19 +324,20 @@ PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown( } void -PendingClusterState::requestNode(uint16_t node) +PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) { vespalib::string distributionHash(_clusterInfo->getDistributionHash()); LOG(debug, - "Requesting bucket info for node %d with cluster state '%s' " + "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' " "and distribution hash '%s'", - node, + bucketSpaceAndNode.bucketSpace.getId(), + bucketSpaceAndNode.node, _newClusterState.toString().c_str(), distributionHash.c_str()); std::shared_ptr<api::RequestBucketInfoCommand> cmd( new api::RequestBucketInfoCommand( - BucketSpace::placeHolder(), + bucketSpaceAndNode.bucketSpace, _sender.getDistributorIndex(), _newClusterState, distributionHash)); @@ -332,9 +345,9 @@ PendingClusterState::requestNode(uint16_t node) cmd->setPriority(api::StorageMessage::HIGH); cmd->setTimeout(INT_MAX); - _sentMessages[cmd->getMsgId()] = node; + _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode); - _sender.sendToNode(NodeType::STORAGE, node, cmd); + _sender.sendToNode(NodeType::STORAGE, bucketSpaceAndNode.node, cmd); } @@ -358,18 +371,20 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request if (iter == _sentMessages.end()) { return false; } - const uint16_t node = iter->second; + const BucketSpaceAndNode bucketSpaceAndNode = iter->second; if (!reply->getResult().success()) { framework::MilliSecTime resendTime(_clock); resendTime += framework::MilliSecTime(100); - _delayedRequests.push_back(std::make_pair(resendTime, node)); + _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode); _sentMessages.erase(iter); return true; } - setNodeReplied(node); - _pendingTransition->onRequestBucketInfoReply(*reply, node); + setNodeReplied(bucketSpaceAndNode.node); + auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace); + assert(transitionIter != _pendingTransitions.end()); + transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node); _sentMessages.erase(iter); return true; @@ -403,9 +418,11 @@ PendingClusterState::requestNodesToString() const } void -PendingClusterState::mergeInto(BucketDatabase& db) +PendingClusterState::mergeIntoBucketDatabases() { - _pendingTransition->mergeInto(db); + for (auto &elem : _bucketSpaceRepo) { + _pendingTransitions[elem.first]->mergeInto(elem.second->getBucketDatabase()); + } } void @@ -414,11 +431,9 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const using namespace vespalib::xml; xos << XmlTag("systemstate_pending") << XmlAttribute("state", _newClusterState); - for (std::map<uint64_t, uint16_t>::const_iterator iter - = _sentMessages.begin(); iter != _sentMessages.end(); ++iter) - { + for (auto &elem : _sentMessages) { xos << XmlTag("pending") - << XmlAttribute("node", iter->second) + << XmlAttribute("node", elem.second.node) << XmlEndTag(); } xos << XmlEndTag(); @@ -432,16 +447,12 @@ PendingClusterState::getSummary() const (_clock.getTimeInMicros().getTime() - _creationTimestamp)); } -const PendingBucketSpaceDbTransition::EntryList & -PendingClusterState::results() const -{ - return _pendingTransition->results(); -} - -void -PendingClusterState::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) +PendingBucketSpaceDbTransition & +PendingClusterState::getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace) { - _pendingTransition->addNodeInfo(id, copy); + auto transitionIter = _pendingTransitions.find(bucketSpace); + assert(transitionIter != _pendingTransitions.end()); + return *transitionIter->second; } } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index b742bd3bf46..ac5c4dc35ea 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -10,14 +10,14 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlserializable.h> #include <unordered_set> +#include <unordered_map> #include <deque> -namespace storage { class BucketDatabase; } - namespace storage::distributor { class DistributorMessageSender; class PendingBucketSpaceDbTransition; +class DistributorBucketSpaceRepo; /** * Class used by BucketDBUpdater to track request bucket info @@ -42,12 +42,13 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const std::unordered_set<uint16_t>& outdatedNodes, api::Timestamp creationTimestamp) { return std::unique_ptr<PendingClusterState>( - new PendingClusterState(clock, clusterInfo, sender, newStateCmd, + new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd, outdatedNodes, creationTimestamp)); } @@ -60,10 +61,11 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) { return std::unique_ptr<PendingClusterState>( - new PendingClusterState(clock, clusterInfo, sender, creationTimestamp)); + new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp)); } PendingClusterState(const PendingClusterState &) = delete; @@ -77,8 +79,8 @@ public: bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& reply); /** - * Tags the given node as having replied to the - * request bucket info command. + * Tags the given node as having replied to at least one of the + * request bucket info commands. Only used for debug logging. */ void setNodeReplied(uint16_t nodeIdx) { _requestedNodes[nodeIdx] = true; @@ -123,14 +125,11 @@ public: std::unordered_set<uint16_t> getOutdatedNodeSet() const; /** - * Merges all the results with the given bucket database. + * Merges all the results with the corresponding bucket databases. */ - void mergeInto(BucketDatabase& db); - // Get our list of information. Only used by unit test. - const std::vector<dbtransition::Entry>& results() const; - // Adds info from a node to our list of information. Only used by unit test. - void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); - + void mergeIntoBucketDatabases(); + // Get pending transition for a specific bucket space. Only used by unit test. + PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace); /** * Returns true if this pending state was due to a distribution bit @@ -150,6 +149,7 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const std::unordered_set<uint16_t>& outdatedNodes, api::Timestamp creationTimestamp); @@ -162,10 +162,23 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp); + struct BucketSpaceAndNode { + document::BucketSpace bucketSpace; + uint16_t node; + BucketSpaceAndNode(document::BucketSpace bucketSpace_, + uint16_t node_) + : bucketSpace(bucketSpace_), + node(node_) + { + } + }; + + void initializeBucketSpaceTransitions(); void logConstructionInformation() const; - void requestNode(uint16_t node); + void requestNode(BucketSpaceAndNode bucketSpaceAndNode); bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState); bool storageNodeMayHaveLostData(uint16_t index); bool storageNodeChanged(uint16_t index); @@ -190,9 +203,9 @@ private: std::shared_ptr<api::SetSystemStateCommand> _cmd; - std::map<uint64_t, uint16_t> _sentMessages; + std::map<uint64_t, BucketSpaceAndNode> _sentMessages; std::vector<bool> _requestedNodes; - std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests; + std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests; // Set for all nodes that may have changed state since that previous // active cluster state, or that were marked as outdated when the pending @@ -209,10 +222,11 @@ private: api::Timestamp _creationTimestamp; DistributorMessageSender& _sender; + DistributorBucketSpaceRepo &_bucketSpaceRepo; bool _distributionChange; bool _bucketOwnershipTransfer; - std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition; + std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions; }; } |