diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-08 13:51:05 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-08 14:41:04 +0000 |
commit | 7a0ffc114735b225c9fb9c9d9941313c5a29c725 (patch) | |
tree | bcfcb68c1d330206e7a71de6f162f1d5ac142329 | |
parent | 4e79e473c7fc914d709bbe6d2af56fd1fb9ccb83 (diff) |
Change pending cluster state to handle more than one bucket space.
-rw-r--r-- | storage/src/vespa/storage/distributor/pendingclusterstate.cpp | 47 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/pendingclusterstate.h | 20 |
2 files changed, 43 insertions, 24 deletions
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 256d0fd407f..a78b66265bd 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -39,7 +39,7 @@ PendingClusterState::PendingClusterState( _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)), - _pendingTransition() + _pendingTransitions() { logConstructionInformation(); if (hasBucketOwnershipTransfer()) { @@ -67,7 +67,7 @@ PendingClusterState::PendingClusterState( _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), _bucketOwnershipTransfer(true), - _pendingTransition() + _pendingTransitions() { logConstructionInformation(); markAllAvailableNodesAsRequiringRequest(); @@ -79,7 +79,9 @@ PendingClusterState::~PendingClusterState() {} void PendingClusterState::constructorHelper() { - _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); + for (auto &elem : _bucketSpaceRepo) { + _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp)); + } if (shouldRequestBucketInfo()) { requestNodes(); } @@ -225,7 +227,9 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() { for (uint16_t idx : _outdatedNodes) { if (storageNodeUpInNewState(idx)) { - requestNode(idx); + for (auto &elem : _bucketSpaceRepo) { + requestNode(BucketSpaceAndNode(elem.first, idx)); + } } } } @@ -318,19 +322,19 @@ PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown( } void -PendingClusterState::requestNode(uint16_t node) +PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) { vespalib::string distributionHash(_clusterInfo->getDistributionHash()); LOG(debug, "Requesting bucket info for node %d with cluster state '%s' " "and distribution hash '%s'", - node, + bucketSpaceAndNode.node, _newClusterState.toString().c_str(), distributionHash.c_str()); std::shared_ptr<api::RequestBucketInfoCommand> cmd( new api::RequestBucketInfoCommand( - BucketSpace::placeHolder(), + bucketSpaceAndNode.bucketSpace, _sender.getDistributorIndex(), _newClusterState, distributionHash)); @@ -338,9 +342,9 @@ PendingClusterState::requestNode(uint16_t node) cmd->setPriority(api::StorageMessage::HIGH); cmd->setTimeout(INT_MAX); - _sentMessages[cmd->getMsgId()] = node; + _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode); - _sender.sendToNode(NodeType::STORAGE, node, cmd); + _sender.sendToNode(NodeType::STORAGE, bucketSpaceAndNode.node, cmd); } @@ -364,18 +368,20 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request if (iter == _sentMessages.end()) { return false; } - const uint16_t node = iter->second; + const BucketSpaceAndNode bucketSpaceAndNode = iter->second; if (!reply->getResult().success()) { framework::MilliSecTime resendTime(_clock); resendTime += framework::MilliSecTime(100); - _delayedRequests.push_back(std::make_pair(resendTime, node)); + _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode); _sentMessages.erase(iter); return true; } - setNodeReplied(node); - _pendingTransition->onRequestBucketInfoReply(*reply, node); + setNodeReplied(bucketSpaceAndNode.node); + auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace); + assert(transitionIter != _pendingTransitions.end()); + transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node); _sentMessages.erase(iter); return true; @@ -411,7 +417,9 @@ PendingClusterState::requestNodesToString() const void PendingClusterState::mergeIntoBucketDatabases() { - _pendingTransition->mergeInto(_bucketSpaceRepo.get(BucketSpace::placeHolder()).getBucketDatabase()); + for (auto &elem : _bucketSpaceRepo) { + _pendingTransitions[elem.first]->mergeInto(elem.second->getBucketDatabase()); + } } void @@ -420,11 +428,9 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const using namespace vespalib::xml; xos << XmlTag("systemstate_pending") << XmlAttribute("state", _newClusterState); - for (std::map<uint64_t, uint16_t>::const_iterator iter - = _sentMessages.begin(); iter != _sentMessages.end(); ++iter) - { + for (auto &elem : _sentMessages) { xos << XmlTag("pending") - << XmlAttribute("node", iter->second) + << XmlAttribute("node", elem.second.node) << XmlEndTag(); } xos << XmlEndTag(); @@ -441,8 +447,9 @@ PendingClusterState::getSummary() const PendingBucketSpaceDbTransition & PendingClusterState::getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace) { - (void) bucketSpace; - return *_pendingTransition; + auto transitionIter = _pendingTransitions.find(bucketSpace); + assert(transitionIter != _pendingTransitions.end()); + return *transitionIter->second; } } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 62b4f8c2b5a..3deed06342e 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -10,6 +10,7 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlserializable.h> #include <unordered_set> +#include <unordered_map> #include <deque> namespace storage::distributor { @@ -164,9 +165,20 @@ private: DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp); + struct BucketSpaceAndNode { + document::BucketSpace bucketSpace; + uint16_t node; + BucketSpaceAndNode(document::BucketSpace bucketSpace_, + uint16_t node_) + : bucketSpace(bucketSpace_), + node(node_) + { + } + }; + void constructorHelper(); void logConstructionInformation() const; - void requestNode(uint16_t node); + void requestNode(BucketSpaceAndNode bucketSpaceAndNode); bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState); bool storageNodeMayHaveLostData(uint16_t index); bool storageNodeChanged(uint16_t index); @@ -191,9 +203,9 @@ private: std::shared_ptr<api::SetSystemStateCommand> _cmd; - std::map<uint64_t, uint16_t> _sentMessages; + std::map<uint64_t, BucketSpaceAndNode> _sentMessages; std::vector<bool> _requestedNodes; - std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests; + std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests; // Set for all nodes that may have changed state since that previous // active cluster state, or that were marked as outdated when the pending @@ -214,7 +226,7 @@ private: bool _distributionChange; bool _bucketOwnershipTransfer; - std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition; + std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions; }; } |