diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-10 13:05:00 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-10 13:05:00 +0000 |
commit | 3665c480590ead529527d5bfc9f2445e0eabdfa3 (patch) | |
tree | 88c4911d07e8b2b6644116fb97b123c4a1bc5012 /storage | |
parent | c70c1047b80c0beac1c11fdb6b11bcd5b6efcb5b (diff) |
Track outdated nodes per bucket space.
Use distributor bucket space to get distribution when detecting outdated nodes.
Diffstat (limited to 'storage')
7 files changed, 266 insertions, 219 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 363065be65c..e792eb5a48d 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -157,6 +157,8 @@ protected: } public: + using OutdatedNodes = std::unordered_set<uint16_t>; + using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>; void setUp() override { createLinks(); }; @@ -539,9 +541,9 @@ public: ClusterInformation::CSP clusterInfo( owner.createClusterInfo(oldClusterState)); - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; state = PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1)); } @@ -552,7 +554,6 @@ public: ClusterInformation::CSP clusterInfo( owner.createClusterInfo(oldClusterState)); - std::unordered_set<uint16_t> outdatedNodes; state = PendingClusterState::createForDistributionChange( clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); } @@ -1640,10 +1641,10 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() framework::defaultimplementation::FakeClock clock; ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); @@ -1798,7 +1799,7 @@ BucketDBUpdaterTest::mergeBucketLists( framework::MilliSecTimer timer(clock); MessageSenderStub sender; - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; { auto cmd(std::make_shared<api::SetSystemStateCommand>(oldState)); @@ -1808,7 +1809,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); @@ -1827,7 +1828,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 50431bda37e..b68e53c7136 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -171,7 +171,7 @@ BucketDBUpdater::storageDistributionChanged( _sender, _bucketSpaceComponent.getBucketSpaceRepo(), _bucketSpaceComponent.getUniqueTimestamp()); - _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); } void @@ -219,9 +219,9 @@ BucketDBUpdater::onSetSystemState( _sender, _bucketSpaceComponent.getBucketSpaceRepo(), cmd, - _outdatedNodes, + _outdatedNodesMap, _bucketSpaceComponent.getUniqueTimestamp()); - _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); if (isPendingClusterStateCompleted()) { processCompletedPendingClusterState(); @@ -500,7 +500,7 @@ BucketDBUpdater::processCompletedPendingClusterState() } _pendingClusterState.reset(); - _outdatedNodes.clear(); + _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 994e207f200..6399519e5f8 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -27,6 +27,8 @@ class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler { public: + using OutdatedNodes = std::unordered_set<uint16_t>; + using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>; BucketDBUpdater(Distributor& owner, DistributorBucketSpaceRepo &bucketSpaceRepo, DistributorBucketSpace& bucketSpace, @@ -226,7 +228,7 @@ private: std::list<PendingClusterState::Summary> _history; DistributorMessageSender& _sender; std::set<EnqueuedBucketRecheck> _enqueuedRechecks; - std::unordered_set<uint16_t> _outdatedNodes; + OutdatedNodesMap _outdatedNodesMap; framework::MilliSecTimer _transitionTimer; }; diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index c2b4358333a..506c2b1e4a3 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -12,9 +12,17 @@ LOG_SETUP(".pendingbucketspacedbtransition"); namespace storage::distributor { +using lib::Node; +using lib::NodeType; +using lib::NodeState; + PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, DistributorBucketSpace &distributorBucketSpace, + int distributorIndex, + bool distributionChanged, + const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, + const lib::ClusterState &prevClusterState, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp) : _entries(), @@ -22,12 +30,24 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus _removedBuckets(), _missingEntries(), _clusterInfo(std::move(clusterInfo)), - _outdatedNodes(pendingClusterState.getOutdatedNodeSet()), + _outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)), + _prevClusterState(prevClusterState), _newClusterState(newClusterState), _creationTimestamp(creationTimestamp), _pendingClusterState(pendingClusterState), - _distributorBucketSpace(distributorBucketSpace) + _distributorBucketSpace(distributorBucketSpace), + _distributorIndex(distributorIndex), + _bucketOwnershipTransfer(distributionChanged) { + if (distributorChanged()) { + _bucketOwnershipTransfer = true; + } + if (_bucketOwnershipTransfer) { + markAllAvailableNodesAsRequiringRequest(); + } else { + updateSetOfNodesThatAreOutdated(); + addAdditionalNodesToOutdatedSet(outdatedNodes); + } } PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() @@ -70,10 +90,12 @@ PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Ra std::vector<BucketCopy> copiesToAddOrUpdate( getCopiesThatAreNewOrAltered(info, range)); + const auto &dist(_distributorBucketSpace.getDistribution()); std::vector<uint16_t> order( - _clusterInfo->getIdealStorageNodesForState( + dist.getIdealStorageNodes( _newClusterState, - _entries[range.first].bucketId)); + _entries[range.first].bucketId, + _clusterInfo->getStorageUpStates())); info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); LOG_BUCKET_OPERATION_NO_LOCK( @@ -228,6 +250,170 @@ PendingBucketSpaceDbTransition::onRequestBucketInfoReply(const api::RequestBucke } } +bool +PendingBucketSpaceDbTransition::distributorChanged() +{ + const auto &oldState(_prevClusterState); + const auto &newState(_newClusterState); + if (newState.getDistributionBitCount() != oldState.getDistributionBitCount()) { + return true; + } + + Node myNode(NodeType::DISTRIBUTOR, _distributorIndex); + if (oldState.getNodeState(myNode).getState() == lib::State::DOWN) { + return true; + } + + uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR); + uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR); + + uint16_t maxCount = std::max(oldCount, newCount); + + for (uint16_t i = 0; i < maxCount; ++i) { + Node node(NodeType::DISTRIBUTOR, i); + + const lib::State& old(oldState.getNodeState(node).getState()); + const lib::State& nw(newState.getNodeState(node).getState()); + + if (nodeWasUpButNowIsDown(old, nw)) { + if (nodeInSameGroupAsSelf(i) || + nodeNeedsOwnershipTransferFromGroupDown(i, newState)) { + return true; + } + } + } + + return false; +} + +bool +PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old, + const lib::State& nw) +{ + return (old.oneOf("uimr") && !nw.oneOf("uimr")); +} + +bool +PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const +{ + const auto &dist(_distributorBucketSpace.getDistribution()); + if (dist.getNodeGraph().getGroupForNode(index) == + dist.getNodeGraph().getGroupForNode(_distributorIndex)) { + LOG(debug, + "Distributor %d state changed, need to request data from all " + "storage nodes", + index); + return true; + } else { + LOG(debug, + "Distributor %d state changed but unrelated to my group.", + index); + return false; + } +} + +bool +PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown( + uint16_t nodeIndex, + const lib::ClusterState& state) const +{ + const auto &dist(_distributorBucketSpace.getDistribution()); + if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { + return false; // Not doing anything for downed groups. + } + const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); + // If there is no group information associated with the node (because the + // group has changed or the node has been removed from config), we must + // also invoke ownership transfer of buckets. + if (group == nullptr + || lib::Distribution::allDistributorsDown(*group, state)) + { + LOG(debug, + "Distributor %u state changed and is in a " + "group that now has no distributors remaining", + nodeIndex); + return true; + } + return false; +} + +uint16_t +PendingBucketSpaceDbTransition::newStateStorageNodeCount() const +{ + return _newClusterState.getNodeCount(lib::NodeType::STORAGE); +} + +bool +PendingBucketSpaceDbTransition::storageNodeMayHaveLostData(uint16_t index) +{ + Node node(NodeType::STORAGE, index); + NodeState newState = _newClusterState.getNodeState(node); + NodeState oldState = _prevClusterState.getNodeState(node); + + return (newState.getStartTimestamp() > oldState.getStartTimestamp()); +} + +void +PendingBucketSpaceDbTransition::updateSetOfNodesThatAreOutdated() +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t index = 0; index < nodeCount; ++index) { + if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) { + _outdatedNodes.insert(index); + } + } +} + +bool +PendingBucketSpaceDbTransition::storageNodeChanged(uint16_t index) { + Node node(NodeType::STORAGE, index); + NodeState newState = _newClusterState.getNodeState(node); + NodeState oldNodeState = _prevClusterState.getNodeState(node); + + // similarTo() also covers disk states. + if (!(oldNodeState.similarTo(newState))) { + LOG(debug, + "State for storage node %d has changed from '%s' to '%s', " + "updating bucket information", + index, + oldNodeState.toString().c_str(), + newState.toString().c_str()); + return true; + } + + return false; +} + +bool +PendingBucketSpaceDbTransition::storageNodeUpInNewState(uint16_t node) const +{ + return _newClusterState.getNodeState(Node(NodeType::STORAGE, node)) + .getState().oneOf(_clusterInfo->getStorageUpStates()); +} + +void +PendingBucketSpaceDbTransition::markAllAvailableNodesAsRequiringRequest() +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t i = 0; i < nodeCount; ++i) { + if (storageNodeUpInNewState(i)) { + _outdatedNodes.insert(i); + } + } +} + +void +PendingBucketSpaceDbTransition::addAdditionalNodesToOutdatedSet( + const std::unordered_set<uint16_t>& nodes) +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t node : nodes) { + if (node < nodeCount) { + _outdatedNodes.insert(node); + } + } +} + void PendingBucketSpaceDbTransition::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) { diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h index 730d6540820..0619218b6a4 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -6,7 +6,7 @@ #include <unordered_set> namespace storage::api { class RequestBucketInfoReply; } -namespace storage::lib { class ClusterState; } +namespace storage::lib { class ClusterState; class State; } namespace storage::distributor { @@ -24,6 +24,7 @@ class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProces public: using Entry = dbtransition::Entry; using EntryList = std::vector<Entry>; + using OutdatedNodes = std::unordered_set<uint16_t>; private: using Range = std::pair<uint32_t, uint32_t>; @@ -38,12 +39,15 @@ private: // cluster state was constructed. // May be a superset of _requestedNodes, as some nodes that are outdated // may be down and thus cannot get a request. - const std::unordered_set<uint16_t> _outdatedNodes; + OutdatedNodes _outdatedNodes; + const lib::ClusterState &_prevClusterState; const lib::ClusterState &_newClusterState; const api::Timestamp _creationTimestamp; const PendingClusterState &_pendingClusterState; DistributorBucketSpace &_distributorBucketSpace; + int _distributorIndex; + bool _bucketOwnershipTransfer; // BucketDataBase::MutableEntryProcessor API bool process(BucketDatabase::Entry& e) override; @@ -73,10 +77,26 @@ private: bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const; std::string requestNodesToString(); + bool distributorChanged(); + static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw); + bool storageNodeUpInNewState(uint16_t node) const; + bool nodeInSameGroupAsSelf(uint16_t index) const; + bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; + uint16_t newStateStorageNodeCount() const; + bool storageNodeMayHaveLostData(uint16_t index); + bool storageNodeChanged(uint16_t index); + void markAllAvailableNodesAsRequiringRequest(); + void addAdditionalNodesToOutdatedSet(const OutdatedNodes &nodes); + void updateSetOfNodesThatAreOutdated(); + public: PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, DistributorBucketSpace &distributorBucketSpace, + int distributorIndex, + bool distributionChanged, + const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, + const lib::ClusterState &prevClusterState, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp); ~PendingBucketSpaceDbTransition(); @@ -87,6 +107,9 @@ public: // Adds the info from the reply to our list of information. void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node); + const OutdatedNodes &getOutdatedNodes() { return _outdatedNodes; } + bool getBucketOwnershipTransfer() const { return _bucketOwnershipTransfer; } + // Methods used by unit tests. const EntryList& results() const { return _entries; } void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 833899140d8..f8a80569e1d 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -26,11 +26,10 @@ PendingClusterState::PendingClusterState( DistributorMessageSender& sender, DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) : _cmd(newStateCmd), _requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), - _outdatedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(newStateCmd->getSystemState()), _clock(clock), @@ -38,17 +37,11 @@ PendingClusterState::PendingClusterState( _creationTimestamp(creationTimestamp), _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), - _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)), + _bucketOwnershipTransfer(false), _pendingTransitions() { logConstructionInformation(); - if (hasBucketOwnershipTransfer()) { - markAllAvailableNodesAsRequiringRequest(); - } else { - updateSetOfNodesThatAreOutdated(); - addAdditionalNodesToOutdatedSet(outdatedNodes); - } - initializeBucketSpaceTransitions(); + initializeBucketSpaceTransitions(false, outdatedNodesMap); } PendingClusterState::PendingClusterState( @@ -58,7 +51,6 @@ PendingClusterState::PendingClusterState( DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), - _outdatedNodes(clusterInfo->getStorageNodeCount()), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(clusterInfo->getClusterState()), _clock(clock), @@ -70,17 +62,28 @@ PendingClusterState::PendingClusterState( _pendingTransitions() { logConstructionInformation(); - markAllAvailableNodesAsRequiringRequest(); - initializeBucketSpaceTransitions(); + initializeBucketSpaceTransitions(true, OutdatedNodesMap()); } PendingClusterState::~PendingClusterState() {} void -PendingClusterState::initializeBucketSpaceTransitions() +PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap) { + OutdatedNodes emptyOutdatedNodes; for (auto &elem : _bucketSpaceRepo) { - _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, *elem.second, _clusterInfo, _newClusterState, _creationTimestamp)); + auto onItr = outdatedNodesMap.find(elem.first); + const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second; + auto pendingTransition = + std::make_unique<PendingBucketSpaceDbTransition> + (*this, *elem.second, + _sender.getDistributorIndex(), distributionChanged, + outdatedNodes, + _clusterInfo, _prevClusterState, _newClusterState, _creationTimestamp); + if (pendingTransition->getBucketOwnershipTransfer()) { + _bucketOwnershipTransfer = true; + } + _pendingTransitions.emplace(elem.first, std::move(pendingTransition)); } if (shouldRequestBucketInfo()) { requestNodes(); @@ -106,33 +109,14 @@ PendingClusterState::storageNodeUpInNewState(uint16_t node) const .getState().oneOf(_clusterInfo->getStorageUpStates()); } -void -PendingClusterState::markAllAvailableNodesAsRequiringRequest() -{ - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t i = 0; i < nodeCount; ++i) { - if (storageNodeUpInNewState(i)) { - _outdatedNodes.insert(i); - } - } -} - -void -PendingClusterState::addAdditionalNodesToOutdatedSet( - const std::unordered_set<uint16_t>& nodes) +PendingClusterState::OutdatedNodesMap +PendingClusterState::getOutdatedNodesMap() const { - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t node : nodes) { - if (node < nodeCount) { - _outdatedNodes.insert(node); - } + OutdatedNodesMap outdatedNodesMap; + for (const auto &elem : _pendingTransitions) { + outdatedNodesMap.emplace(elem.first, elem.second->getOutdatedNodes()); } -} - -std::unordered_set<uint16_t> -PendingClusterState::getOutdatedNodeSet() const -{ - return _outdatedNodes; + return outdatedNodesMap; } uint16_t @@ -170,47 +154,6 @@ PendingClusterState::iAmDown() const return myState.getState() == lib::State::DOWN; } -bool -PendingClusterState::storageNodeMayHaveLostData(uint16_t index) -{ - Node node(NodeType::STORAGE, index); - NodeState newState = _newClusterState.getNodeState(node); - NodeState oldState = _prevClusterState.getNodeState(node); - - return (newState.getStartTimestamp() > oldState.getStartTimestamp()); -} - -void -PendingClusterState::updateSetOfNodesThatAreOutdated() -{ - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t index = 0; index < nodeCount; ++index) { - if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) { - _outdatedNodes.insert(index); - } - } -} - -bool -PendingClusterState::storageNodeChanged(uint16_t index) { - Node node(NodeType::STORAGE, index); - NodeState newState = _newClusterState.getNodeState(node); - NodeState oldNodeState = _prevClusterState.getNodeState(node); - - // similarTo() also covers disk states. - if (!(oldNodeState.similarTo(newState))) { - LOG(debug, - "State for storage node %d has changed from '%s' to '%s', " - "updating bucket information", - index, - oldNodeState.toString().c_str(), - newState.toString().c_str()); - return true; - } - - return false; -} - void PendingClusterState::requestNodes() { @@ -225,104 +168,16 @@ PendingClusterState::requestNodes() void PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() { - for (uint16_t idx : _outdatedNodes) { - if (storageNodeUpInNewState(idx)) { - for (auto &elem : _bucketSpaceRepo) { + for (auto &elem : _pendingTransitions) { + const OutdatedNodes &outdatedNodes(elem.second->getOutdatedNodes()); + for (uint16_t idx : outdatedNodes) { + if (storageNodeUpInNewState(idx)) { requestNode(BucketSpaceAndNode(elem.first, idx)); } } } } -bool -PendingClusterState::distributorChanged( - const lib::ClusterState& oldState, - const lib::ClusterState& newState) -{ - if (newState.getDistributionBitCount() != - oldState.getDistributionBitCount()) - { - return true; - } - - Node myNode(NodeType::DISTRIBUTOR, _sender.getDistributorIndex()); - if (oldState.getNodeState(myNode).getState() == - lib::State::DOWN) - { - return true; - } - - uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR); - uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR); - - uint16_t maxCount = std::max(oldCount, newCount); - - for (uint16_t i = 0; i < maxCount; ++i) { - Node node(NodeType::DISTRIBUTOR, i); - - const lib::State& old(oldState.getNodeState(node).getState()); - const lib::State& nw(newState.getNodeState(node).getState()); - - if (nodeWasUpButNowIsDown(old, nw)) { - if (nodeInSameGroupAsSelf(i) || - nodeNeedsOwnershipTransferFromGroupDown(i, newState)) { - return true; - } - } - } - - return false; -} - -bool -PendingClusterState::nodeWasUpButNowIsDown(const lib::State& old, - const lib::State& nw) const -{ - return (old.oneOf("uimr") && !nw.oneOf("uimr")); -} - -bool -PendingClusterState::nodeInSameGroupAsSelf(uint16_t index) const -{ - if (_clusterInfo->nodeInSameGroupAsSelf(index)) { - LOG(debug, - "Distributor %d state changed, need to request data from all " - "storage nodes", - index); - return true; - } else { - LOG(debug, - "Distributor %d state changed but unrelated to my group.", - index); - return false; - } -} - -bool -PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown( - uint16_t nodeIndex, - const lib::ClusterState& state) const -{ - const lib::Distribution& dist(_clusterInfo->getDistribution()); - if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { - return false; // Not doing anything for downed groups. - } - const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); - // If there is no group information associated with the node (because the - // group has changed or the node has been removed from config), we must - // also invoke ownership transfer of buckets. - if (group == nullptr - || lib::Distribution::allDistributorsDown(*group, state)) - { - LOG(debug, - "Distributor %u state changed and is in a " - "group that now has no distributors remaining", - nodeIndex); - return true; - } - return false; -} - void PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) { diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index ac5c4dc35ea..460f25324d4 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -25,6 +25,8 @@ class DistributorBucketSpaceRepo; */ class PendingClusterState : public vespalib::XmlSerializable { public: + using OutdatedNodes = std::unordered_set<uint16_t>; + using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>; struct Summary { Summary(const std::string& prevClusterState, const std::string& newClusterState, uint32_t processingTime); Summary(const Summary &); @@ -44,12 +46,12 @@ public: DistributorMessageSender& sender, DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) { return std::unique_ptr<PendingClusterState>( new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd, - outdatedNodes, + outdatedNodesMap, creationTimestamp)); } @@ -122,7 +124,7 @@ public: * state was constructed for a distribution config change, this set will * be equal to the set of all available storage nodes. */ - std::unordered_set<uint16_t> getOutdatedNodeSet() const; + OutdatedNodesMap getOutdatedNodesMap() const; /** * Merges all the results with the corresponding bucket databases. @@ -131,11 +133,6 @@ public: // Get pending transition for a specific bucket space. Only used by unit test. PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace); - /** - * Returns true if this pending state was due to a distribution bit - * change rather than an actual state change. - */ - bool distributionChange() const { return _distributionChange; } void printXml(vespalib::XmlOutputStream&) const override; Summary getSummary() const; std::string requestNodesToString() const; @@ -151,7 +148,7 @@ private: DistributorMessageSender& sender, DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp); /** @@ -176,15 +173,9 @@ private: } }; - void initializeBucketSpaceTransitions(); + void initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap); void logConstructionInformation() const; void requestNode(BucketSpaceAndNode bucketSpaceAndNode); - bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState); - bool storageNodeMayHaveLostData(uint16_t index); - bool storageNodeChanged(uint16_t index); - void markAllAvailableNodesAsRequiringRequest(); - void addAdditionalNodesToOutdatedSet(const std::unordered_set<uint16_t>& nodes); - void updateSetOfNodesThatAreOutdated(); void requestNodes(); void requestBucketInfoFromStorageNodesWithChangedState(); @@ -195,9 +186,6 @@ private: bool shouldRequestBucketInfo() const; bool clusterIsDown() const; bool iAmDown() const; - bool nodeInSameGroupAsSelf(uint16_t index) const; - bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; - bool nodeWasUpButNowIsDown(const lib::State& old, const lib::State& nw) const; bool storageNodeUpInNewState(uint16_t node) const; @@ -207,13 +195,6 @@ private: std::vector<bool> _requestedNodes; std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests; - // Set for all nodes that may have changed state since that previous - // active cluster state, or that were marked as outdated when the pending - // cluster state was constructed. - // May be a superset of _requestedNodes, as some nodes that are outdated - // may be down and thus cannot get a request. - std::unordered_set<uint16_t> _outdatedNodes; - lib::ClusterState _prevClusterState; lib::ClusterState _newClusterState; @@ -224,7 +205,6 @@ private: DistributorMessageSender& _sender; DistributorBucketSpaceRepo &_bucketSpaceRepo; - bool _distributionChange; bool _bucketOwnershipTransfer; std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions; }; |