summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-10 13:05:00 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-10 13:05:00 +0000
commit3665c480590ead529527d5bfc9f2445e0eabdfa3 (patch)
tree88c4911d07e8b2b6644116fb97b123c4a1bc5012 /storage
parentc70c1047b80c0beac1c11fdb6b11bcd5b6efcb5b (diff)
Track outdated nodes per bucket space.
Use distributor bucket space to get distribution when detecting outdated nodes.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h4
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp194
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h27
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp201
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h34
7 files changed, 266 insertions, 219 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 363065be65c..e792eb5a48d 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -157,6 +157,8 @@ protected:
}
public:
+ using OutdatedNodes = std::unordered_set<uint16_t>;
+ using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>;
void setUp() override {
createLinks();
};
@@ -539,9 +541,9 @@ public:
ClusterInformation::CSP clusterInfo(
owner.createClusterInfo(oldClusterState));
- std::unordered_set<uint16_t> outdatedNodes;
+ OutdatedNodesMap outdatedNodesMap;
state = PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodes,
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodesMap,
api::Timestamp(1));
}
@@ -552,7 +554,6 @@ public:
ClusterInformation::CSP clusterInfo(
owner.createClusterInfo(oldClusterState));
- std::unordered_set<uint16_t> outdatedNodes;
state = PendingClusterState::createForDistributionChange(
clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1));
}
@@ -1640,10 +1641,10 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
framework::defaultimplementation::FakeClock clock;
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
- std::unordered_set<uint16_t> outdatedNodes;
+ OutdatedNodesMap outdatedNodesMap;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes,
+ clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
api::Timestamp(1)));
CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size());
@@ -1798,7 +1799,7 @@ BucketDBUpdaterTest::mergeBucketLists(
framework::MilliSecTimer timer(clock);
MessageSenderStub sender;
- std::unordered_set<uint16_t> outdatedNodes;
+ OutdatedNodesMap outdatedNodesMap;
{
auto cmd(std::make_shared<api::SetSystemStateCommand>(oldState));
@@ -1808,7 +1809,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes,
+ clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
beforeTime));
parseInputData(existingData, beforeTime, *state, includeBucketInfo);
@@ -1827,7 +1828,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString()));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes,
+ clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap,
afterTime));
parseInputData(newData, afterTime, *state, includeBucketInfo);
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 50431bda37e..b68e53c7136 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -171,7 +171,7 @@ BucketDBUpdater::storageDistributionChanged(
_sender,
_bucketSpaceComponent.getBucketSpaceRepo(),
_bucketSpaceComponent.getUniqueTimestamp());
- _outdatedNodes = _pendingClusterState->getOutdatedNodeSet();
+ _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
}
void
@@ -219,9 +219,9 @@ BucketDBUpdater::onSetSystemState(
_sender,
_bucketSpaceComponent.getBucketSpaceRepo(),
cmd,
- _outdatedNodes,
+ _outdatedNodesMap,
_bucketSpaceComponent.getUniqueTimestamp());
- _outdatedNodes = _pendingClusterState->getOutdatedNodeSet();
+ _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
if (isPendingClusterStateCompleted()) {
processCompletedPendingClusterState();
@@ -500,7 +500,7 @@ BucketDBUpdater::processCompletedPendingClusterState()
}
_pendingClusterState.reset();
- _outdatedNodes.clear();
+ _outdatedNodesMap.clear();
sendAllQueuedBucketRechecks();
completeTransitionTimer();
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 994e207f200..6399519e5f8 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -27,6 +27,8 @@ class BucketDBUpdater : public framework::StatusReporter,
public api::MessageHandler
{
public:
+ using OutdatedNodes = std::unordered_set<uint16_t>;
+ using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>;
BucketDBUpdater(Distributor& owner,
DistributorBucketSpaceRepo &bucketSpaceRepo,
DistributorBucketSpace& bucketSpace,
@@ -226,7 +228,7 @@ private:
std::list<PendingClusterState::Summary> _history;
DistributorMessageSender& _sender;
std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
- std::unordered_set<uint16_t> _outdatedNodes;
+ OutdatedNodesMap _outdatedNodesMap;
framework::MilliSecTimer _transitionTimer;
};
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index c2b4358333a..506c2b1e4a3 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -12,9 +12,17 @@ LOG_SETUP(".pendingbucketspacedbtransition");
namespace storage::distributor {
+using lib::Node;
+using lib::NodeType;
+using lib::NodeState;
+
PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
DistributorBucketSpace &distributorBucketSpace,
+ int distributorIndex,
+ bool distributionChanged,
+ const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
+ const lib::ClusterState &prevClusterState,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp)
: _entries(),
@@ -22,12 +30,24 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus
_removedBuckets(),
_missingEntries(),
_clusterInfo(std::move(clusterInfo)),
- _outdatedNodes(pendingClusterState.getOutdatedNodeSet()),
+ _outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)),
+ _prevClusterState(prevClusterState),
_newClusterState(newClusterState),
_creationTimestamp(creationTimestamp),
_pendingClusterState(pendingClusterState),
- _distributorBucketSpace(distributorBucketSpace)
+ _distributorBucketSpace(distributorBucketSpace),
+ _distributorIndex(distributorIndex),
+ _bucketOwnershipTransfer(distributionChanged)
{
+ if (distributorChanged()) {
+ _bucketOwnershipTransfer = true;
+ }
+ if (_bucketOwnershipTransfer) {
+ markAllAvailableNodesAsRequiringRequest();
+ } else {
+ updateSetOfNodesThatAreOutdated();
+ addAdditionalNodesToOutdatedSet(outdatedNodes);
+ }
}
PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition()
@@ -70,10 +90,12 @@ PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Ra
std::vector<BucketCopy> copiesToAddOrUpdate(
getCopiesThatAreNewOrAltered(info, range));
+ const auto &dist(_distributorBucketSpace.getDistribution());
std::vector<uint16_t> order(
- _clusterInfo->getIdealStorageNodesForState(
+ dist.getIdealStorageNodes(
_newClusterState,
- _entries[range.first].bucketId));
+ _entries[range.first].bucketId,
+ _clusterInfo->getStorageUpStates()));
info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER);
LOG_BUCKET_OPERATION_NO_LOCK(
@@ -228,6 +250,170 @@ PendingBucketSpaceDbTransition::onRequestBucketInfoReply(const api::RequestBucke
}
}
+bool
+PendingBucketSpaceDbTransition::distributorChanged()
+{
+ const auto &oldState(_prevClusterState);
+ const auto &newState(_newClusterState);
+ if (newState.getDistributionBitCount() != oldState.getDistributionBitCount()) {
+ return true;
+ }
+
+ Node myNode(NodeType::DISTRIBUTOR, _distributorIndex);
+ if (oldState.getNodeState(myNode).getState() == lib::State::DOWN) {
+ return true;
+ }
+
+ uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR);
+ uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR);
+
+ uint16_t maxCount = std::max(oldCount, newCount);
+
+ for (uint16_t i = 0; i < maxCount; ++i) {
+ Node node(NodeType::DISTRIBUTOR, i);
+
+ const lib::State& old(oldState.getNodeState(node).getState());
+ const lib::State& nw(newState.getNodeState(node).getState());
+
+ if (nodeWasUpButNowIsDown(old, nw)) {
+ if (nodeInSameGroupAsSelf(i) ||
+ nodeNeedsOwnershipTransferFromGroupDown(i, newState)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+bool
+PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old,
+ const lib::State& nw)
+{
+ return (old.oneOf("uimr") && !nw.oneOf("uimr"));
+}
+
+bool
+PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const
+{
+ const auto &dist(_distributorBucketSpace.getDistribution());
+ if (dist.getNodeGraph().getGroupForNode(index) ==
+ dist.getNodeGraph().getGroupForNode(_distributorIndex)) {
+ LOG(debug,
+ "Distributor %d state changed, need to request data from all "
+ "storage nodes",
+ index);
+ return true;
+ } else {
+ LOG(debug,
+ "Distributor %d state changed but unrelated to my group.",
+ index);
+ return false;
+ }
+}
+
+bool
+PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown(
+ uint16_t nodeIndex,
+ const lib::ClusterState& state) const
+{
+ const auto &dist(_distributorBucketSpace.getDistribution());
+ if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) {
+ return false; // Not doing anything for downed groups.
+ }
+ const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex));
+ // If there is no group information associated with the node (because the
+ // group has changed or the node has been removed from config), we must
+ // also invoke ownership transfer of buckets.
+ if (group == nullptr
+ || lib::Distribution::allDistributorsDown(*group, state))
+ {
+ LOG(debug,
+ "Distributor %u state changed and is in a "
+ "group that now has no distributors remaining",
+ nodeIndex);
+ return true;
+ }
+ return false;
+}
+
+uint16_t
+PendingBucketSpaceDbTransition::newStateStorageNodeCount() const
+{
+ return _newClusterState.getNodeCount(lib::NodeType::STORAGE);
+}
+
+bool
+PendingBucketSpaceDbTransition::storageNodeMayHaveLostData(uint16_t index)
+{
+ Node node(NodeType::STORAGE, index);
+ NodeState newState = _newClusterState.getNodeState(node);
+ NodeState oldState = _prevClusterState.getNodeState(node);
+
+ return (newState.getStartTimestamp() > oldState.getStartTimestamp());
+}
+
+void
+PendingBucketSpaceDbTransition::updateSetOfNodesThatAreOutdated()
+{
+ const uint16_t nodeCount(newStateStorageNodeCount());
+ for (uint16_t index = 0; index < nodeCount; ++index) {
+ if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) {
+ _outdatedNodes.insert(index);
+ }
+ }
+}
+
+bool
+PendingBucketSpaceDbTransition::storageNodeChanged(uint16_t index) {
+ Node node(NodeType::STORAGE, index);
+ NodeState newState = _newClusterState.getNodeState(node);
+ NodeState oldNodeState = _prevClusterState.getNodeState(node);
+
+ // similarTo() also covers disk states.
+ if (!(oldNodeState.similarTo(newState))) {
+ LOG(debug,
+ "State for storage node %d has changed from '%s' to '%s', "
+ "updating bucket information",
+ index,
+ oldNodeState.toString().c_str(),
+ newState.toString().c_str());
+ return true;
+ }
+
+ return false;
+}
+
+bool
+PendingBucketSpaceDbTransition::storageNodeUpInNewState(uint16_t node) const
+{
+ return _newClusterState.getNodeState(Node(NodeType::STORAGE, node))
+ .getState().oneOf(_clusterInfo->getStorageUpStates());
+}
+
+void
+PendingBucketSpaceDbTransition::markAllAvailableNodesAsRequiringRequest()
+{
+ const uint16_t nodeCount(newStateStorageNodeCount());
+ for (uint16_t i = 0; i < nodeCount; ++i) {
+ if (storageNodeUpInNewState(i)) {
+ _outdatedNodes.insert(i);
+ }
+ }
+}
+
+void
+PendingBucketSpaceDbTransition::addAdditionalNodesToOutdatedSet(
+ const std::unordered_set<uint16_t>& nodes)
+{
+ const uint16_t nodeCount(newStateStorageNodeCount());
+ for (uint16_t node : nodes) {
+ if (node < nodeCount) {
+ _outdatedNodes.insert(node);
+ }
+ }
+}
+
void
PendingBucketSpaceDbTransition::addNodeInfo(const document::BucketId& id, const BucketCopy& copy)
{
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
index 730d6540820..0619218b6a4 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -6,7 +6,7 @@
#include <unordered_set>
namespace storage::api { class RequestBucketInfoReply; }
-namespace storage::lib { class ClusterState; }
+namespace storage::lib { class ClusterState; class State; }
namespace storage::distributor {
@@ -24,6 +24,7 @@ class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProces
public:
using Entry = dbtransition::Entry;
using EntryList = std::vector<Entry>;
+ using OutdatedNodes = std::unordered_set<uint16_t>;
private:
using Range = std::pair<uint32_t, uint32_t>;
@@ -38,12 +39,15 @@ private:
// cluster state was constructed.
// May be a superset of _requestedNodes, as some nodes that are outdated
// may be down and thus cannot get a request.
- const std::unordered_set<uint16_t> _outdatedNodes;
+ OutdatedNodes _outdatedNodes;
+ const lib::ClusterState &_prevClusterState;
const lib::ClusterState &_newClusterState;
const api::Timestamp _creationTimestamp;
const PendingClusterState &_pendingClusterState;
DistributorBucketSpace &_distributorBucketSpace;
+ int _distributorIndex;
+ bool _bucketOwnershipTransfer;
// BucketDataBase::MutableEntryProcessor API
bool process(BucketDatabase::Entry& e) override;
@@ -73,10 +77,26 @@ private:
bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const;
std::string requestNodesToString();
+ bool distributorChanged();
+ static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw);
+ bool storageNodeUpInNewState(uint16_t node) const;
+ bool nodeInSameGroupAsSelf(uint16_t index) const;
+ bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const;
+ uint16_t newStateStorageNodeCount() const;
+ bool storageNodeMayHaveLostData(uint16_t index);
+ bool storageNodeChanged(uint16_t index);
+ void markAllAvailableNodesAsRequiringRequest();
+ void addAdditionalNodesToOutdatedSet(const OutdatedNodes &nodes);
+ void updateSetOfNodesThatAreOutdated();
+
public:
PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState,
DistributorBucketSpace &distributorBucketSpace,
+ int distributorIndex,
+ bool distributionChanged,
+ const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
+ const lib::ClusterState &prevClusterState,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp);
~PendingBucketSpaceDbTransition();
@@ -87,6 +107,9 @@ public:
// Adds the info from the reply to our list of information.
void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node);
+ const OutdatedNodes &getOutdatedNodes() { return _outdatedNodes; }
+ bool getBucketOwnershipTransfer() const { return _bucketOwnershipTransfer; }
+
// Methods used by unit tests.
const EntryList& results() const { return _entries; }
void addNodeInfo(const document::BucketId& id, const BucketCopy& copy);
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 833899140d8..f8a80569e1d 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -26,11 +26,10 @@ PendingClusterState::PendingClusterState(
DistributorMessageSender& sender,
DistributorBucketSpaceRepo &bucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
- const std::unordered_set<uint16_t>& outdatedNodes,
+ const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
: _cmd(newStateCmd),
_requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)),
- _outdatedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)),
_prevClusterState(clusterInfo->getClusterState()),
_newClusterState(newStateCmd->getSystemState()),
_clock(clock),
@@ -38,17 +37,11 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
- _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)),
+ _bucketOwnershipTransfer(false),
_pendingTransitions()
{
logConstructionInformation();
- if (hasBucketOwnershipTransfer()) {
- markAllAvailableNodesAsRequiringRequest();
- } else {
- updateSetOfNodesThatAreOutdated();
- addAdditionalNodesToOutdatedSet(outdatedNodes);
- }
- initializeBucketSpaceTransitions();
+ initializeBucketSpaceTransitions(false, outdatedNodesMap);
}
PendingClusterState::PendingClusterState(
@@ -58,7 +51,6 @@ PendingClusterState::PendingClusterState(
DistributorBucketSpaceRepo &bucketSpaceRepo,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
- _outdatedNodes(clusterInfo->getStorageNodeCount()),
_prevClusterState(clusterInfo->getClusterState()),
_newClusterState(clusterInfo->getClusterState()),
_clock(clock),
@@ -70,17 +62,28 @@ PendingClusterState::PendingClusterState(
_pendingTransitions()
{
logConstructionInformation();
- markAllAvailableNodesAsRequiringRequest();
- initializeBucketSpaceTransitions();
+ initializeBucketSpaceTransitions(true, OutdatedNodesMap());
}
PendingClusterState::~PendingClusterState() {}
void
-PendingClusterState::initializeBucketSpaceTransitions()
+PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap)
{
+ OutdatedNodes emptyOutdatedNodes;
for (auto &elem : _bucketSpaceRepo) {
- _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, *elem.second, _clusterInfo, _newClusterState, _creationTimestamp));
+ auto onItr = outdatedNodesMap.find(elem.first);
+ const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second;
+ auto pendingTransition =
+ std::make_unique<PendingBucketSpaceDbTransition>
+ (*this, *elem.second,
+ _sender.getDistributorIndex(), distributionChanged,
+ outdatedNodes,
+ _clusterInfo, _prevClusterState, _newClusterState, _creationTimestamp);
+ if (pendingTransition->getBucketOwnershipTransfer()) {
+ _bucketOwnershipTransfer = true;
+ }
+ _pendingTransitions.emplace(elem.first, std::move(pendingTransition));
}
if (shouldRequestBucketInfo()) {
requestNodes();
@@ -106,33 +109,14 @@ PendingClusterState::storageNodeUpInNewState(uint16_t node) const
.getState().oneOf(_clusterInfo->getStorageUpStates());
}
-void
-PendingClusterState::markAllAvailableNodesAsRequiringRequest()
-{
- const uint16_t nodeCount(newStateStorageNodeCount());
- for (uint16_t i = 0; i < nodeCount; ++i) {
- if (storageNodeUpInNewState(i)) {
- _outdatedNodes.insert(i);
- }
- }
-}
-
-void
-PendingClusterState::addAdditionalNodesToOutdatedSet(
- const std::unordered_set<uint16_t>& nodes)
+PendingClusterState::OutdatedNodesMap
+PendingClusterState::getOutdatedNodesMap() const
{
- const uint16_t nodeCount(newStateStorageNodeCount());
- for (uint16_t node : nodes) {
- if (node < nodeCount) {
- _outdatedNodes.insert(node);
- }
+ OutdatedNodesMap outdatedNodesMap;
+ for (const auto &elem : _pendingTransitions) {
+ outdatedNodesMap.emplace(elem.first, elem.second->getOutdatedNodes());
}
-}
-
-std::unordered_set<uint16_t>
-PendingClusterState::getOutdatedNodeSet() const
-{
- return _outdatedNodes;
+ return outdatedNodesMap;
}
uint16_t
@@ -170,47 +154,6 @@ PendingClusterState::iAmDown() const
return myState.getState() == lib::State::DOWN;
}
-bool
-PendingClusterState::storageNodeMayHaveLostData(uint16_t index)
-{
- Node node(NodeType::STORAGE, index);
- NodeState newState = _newClusterState.getNodeState(node);
- NodeState oldState = _prevClusterState.getNodeState(node);
-
- return (newState.getStartTimestamp() > oldState.getStartTimestamp());
-}
-
-void
-PendingClusterState::updateSetOfNodesThatAreOutdated()
-{
- const uint16_t nodeCount(newStateStorageNodeCount());
- for (uint16_t index = 0; index < nodeCount; ++index) {
- if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) {
- _outdatedNodes.insert(index);
- }
- }
-}
-
-bool
-PendingClusterState::storageNodeChanged(uint16_t index) {
- Node node(NodeType::STORAGE, index);
- NodeState newState = _newClusterState.getNodeState(node);
- NodeState oldNodeState = _prevClusterState.getNodeState(node);
-
- // similarTo() also covers disk states.
- if (!(oldNodeState.similarTo(newState))) {
- LOG(debug,
- "State for storage node %d has changed from '%s' to '%s', "
- "updating bucket information",
- index,
- oldNodeState.toString().c_str(),
- newState.toString().c_str());
- return true;
- }
-
- return false;
-}
-
void
PendingClusterState::requestNodes()
{
@@ -225,104 +168,16 @@ PendingClusterState::requestNodes()
void
PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState()
{
- for (uint16_t idx : _outdatedNodes) {
- if (storageNodeUpInNewState(idx)) {
- for (auto &elem : _bucketSpaceRepo) {
+ for (auto &elem : _pendingTransitions) {
+ const OutdatedNodes &outdatedNodes(elem.second->getOutdatedNodes());
+ for (uint16_t idx : outdatedNodes) {
+ if (storageNodeUpInNewState(idx)) {
requestNode(BucketSpaceAndNode(elem.first, idx));
}
}
}
}
-bool
-PendingClusterState::distributorChanged(
- const lib::ClusterState& oldState,
- const lib::ClusterState& newState)
-{
- if (newState.getDistributionBitCount() !=
- oldState.getDistributionBitCount())
- {
- return true;
- }
-
- Node myNode(NodeType::DISTRIBUTOR, _sender.getDistributorIndex());
- if (oldState.getNodeState(myNode).getState() ==
- lib::State::DOWN)
- {
- return true;
- }
-
- uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR);
- uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR);
-
- uint16_t maxCount = std::max(oldCount, newCount);
-
- for (uint16_t i = 0; i < maxCount; ++i) {
- Node node(NodeType::DISTRIBUTOR, i);
-
- const lib::State& old(oldState.getNodeState(node).getState());
- const lib::State& nw(newState.getNodeState(node).getState());
-
- if (nodeWasUpButNowIsDown(old, nw)) {
- if (nodeInSameGroupAsSelf(i) ||
- nodeNeedsOwnershipTransferFromGroupDown(i, newState)) {
- return true;
- }
- }
- }
-
- return false;
-}
-
-bool
-PendingClusterState::nodeWasUpButNowIsDown(const lib::State& old,
- const lib::State& nw) const
-{
- return (old.oneOf("uimr") && !nw.oneOf("uimr"));
-}
-
-bool
-PendingClusterState::nodeInSameGroupAsSelf(uint16_t index) const
-{
- if (_clusterInfo->nodeInSameGroupAsSelf(index)) {
- LOG(debug,
- "Distributor %d state changed, need to request data from all "
- "storage nodes",
- index);
- return true;
- } else {
- LOG(debug,
- "Distributor %d state changed but unrelated to my group.",
- index);
- return false;
- }
-}
-
-bool
-PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown(
- uint16_t nodeIndex,
- const lib::ClusterState& state) const
-{
- const lib::Distribution& dist(_clusterInfo->getDistribution());
- if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) {
- return false; // Not doing anything for downed groups.
- }
- const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex));
- // If there is no group information associated with the node (because the
- // group has changed or the node has been removed from config), we must
- // also invoke ownership transfer of buckets.
- if (group == nullptr
- || lib::Distribution::allDistributorsDown(*group, state))
- {
- LOG(debug,
- "Distributor %u state changed and is in a "
- "group that now has no distributors remaining",
- nodeIndex);
- return true;
- }
- return false;
-}
-
void
PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
{
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index ac5c4dc35ea..460f25324d4 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -25,6 +25,8 @@ class DistributorBucketSpaceRepo;
*/
class PendingClusterState : public vespalib::XmlSerializable {
public:
+ using OutdatedNodes = std::unordered_set<uint16_t>;
+ using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>;
struct Summary {
Summary(const std::string& prevClusterState, const std::string& newClusterState, uint32_t processingTime);
Summary(const Summary &);
@@ -44,12 +46,12 @@ public:
DistributorMessageSender& sender,
DistributorBucketSpaceRepo &bucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
- const std::unordered_set<uint16_t>& outdatedNodes,
+ const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
{
return std::unique_ptr<PendingClusterState>(
new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd,
- outdatedNodes,
+ outdatedNodesMap,
creationTimestamp));
}
@@ -122,7 +124,7 @@ public:
* state was constructed for a distribution config change, this set will
* be equal to the set of all available storage nodes.
*/
- std::unordered_set<uint16_t> getOutdatedNodeSet() const;
+ OutdatedNodesMap getOutdatedNodesMap() const;
/**
* Merges all the results with the corresponding bucket databases.
@@ -131,11 +133,6 @@ public:
// Get pending transition for a specific bucket space. Only used by unit test.
PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace);
- /**
- * Returns true if this pending state was due to a distribution bit
- * change rather than an actual state change.
- */
- bool distributionChange() const { return _distributionChange; }
void printXml(vespalib::XmlOutputStream&) const override;
Summary getSummary() const;
std::string requestNodesToString() const;
@@ -151,7 +148,7 @@ private:
DistributorMessageSender& sender,
DistributorBucketSpaceRepo &bucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
- const std::unordered_set<uint16_t>& outdatedNodes,
+ const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp);
/**
@@ -176,15 +173,9 @@ private:
}
};
- void initializeBucketSpaceTransitions();
+ void initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap);
void logConstructionInformation() const;
void requestNode(BucketSpaceAndNode bucketSpaceAndNode);
- bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState);
- bool storageNodeMayHaveLostData(uint16_t index);
- bool storageNodeChanged(uint16_t index);
- void markAllAvailableNodesAsRequiringRequest();
- void addAdditionalNodesToOutdatedSet(const std::unordered_set<uint16_t>& nodes);
- void updateSetOfNodesThatAreOutdated();
void requestNodes();
void requestBucketInfoFromStorageNodesWithChangedState();
@@ -195,9 +186,6 @@ private:
bool shouldRequestBucketInfo() const;
bool clusterIsDown() const;
bool iAmDown() const;
- bool nodeInSameGroupAsSelf(uint16_t index) const;
- bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const;
- bool nodeWasUpButNowIsDown(const lib::State& old, const lib::State& nw) const;
bool storageNodeUpInNewState(uint16_t node) const;
@@ -207,13 +195,6 @@ private:
std::vector<bool> _requestedNodes;
std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests;
- // Set for all nodes that may have changed state since that previous
- // active cluster state, or that were marked as outdated when the pending
- // cluster state was constructed.
- // May be a superset of _requestedNodes, as some nodes that are outdated
- // may be down and thus cannot get a request.
- std::unordered_set<uint16_t> _outdatedNodes;
-
lib::ClusterState _prevClusterState;
lib::ClusterState _newClusterState;
@@ -224,7 +205,6 @@ private:
DistributorMessageSender& _sender;
DistributorBucketSpaceRepo &_bucketSpaceRepo;
- bool _distributionChange;
bool _bucketOwnershipTransfer;
std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
};