summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2017-11-09 16:51:38 +0100
committerGitHub <noreply@github.com>2017-11-09 16:51:38 +0100
commit310675a44a01e9f671b06d63a2043cdad2cfc58a (patch)
tree2ee82fc73ebb1f8fbe358a604adde4dfeeb5fe12 /storage
parent361e3e62b28f9193b48b88d6c4fd3c89facf9862 (diff)
parentee7a78f7cded5cdddfcc76f5651126f01d36fa9f (diff)
Merge pull request #4049 from vespa-engine/toregge/change-pending-cluster-state-to-handle-multiple-bucket-spaces
Change pending cluster state to handle multiple bucket spaces
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp28
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp10
-rw-r--r--storage/src/tests/distributor/distributortestutil.h3
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp81
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h48
7 files changed, 111 insertions, 65 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index d1a54c04359..363065be65c 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -4,11 +4,13 @@
#include <iomanip>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
+#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/test/make_bucket_space.h>
#include <vespa/storage/distributor/simpleclusterinformation.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/vespalib/text/stringtokenizer.h>
@@ -16,6 +18,7 @@
using namespace storage::api;
using namespace storage::lib;
using document::test::makeDocumentBucket;
+using document::test::makeBucketSpace;
namespace storage {
namespace distributor {
@@ -538,7 +541,7 @@ public:
std::unordered_set<uint16_t> outdatedNodes;
state = PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, cmd, outdatedNodes,
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodes,
api::Timestamp(1));
}
@@ -551,7 +554,7 @@ public:
std::unordered_set<uint16_t> outdatedNodes;
state = PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, api::Timestamp(1));
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1));
}
};
@@ -1475,7 +1478,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, api::Timestamp(1)));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1)));
sortSentMessagesByIndex(sender);
@@ -1640,7 +1643,7 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
std::unordered_set<uint16_t> outdatedNodes;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, cmd, outdatedNodes,
+ clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes,
api::Timestamp(1)));
CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size());
@@ -1668,7 +1671,8 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
state->done());
}
- CPPUNIT_ASSERT_EQUAL(3, (int)state->results().size());
+ auto &pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace());
+ CPPUNIT_ASSERT_EQUAL(3, (int)pendingTransition.results().size());
}
void
@@ -1721,13 +1725,14 @@ parseInputData(const std::string& data,
uint16_t node = atoi(tok2[0].c_str());
state.setNodeReplied(node);
+ auto &pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace());
vespalib::StringTokenizer tok3(tok2[1], ",");
for (uint32_t j = 0; j < tok3.size(); j++) {
if (includeBucketInfo) {
vespalib::StringTokenizer tok4(tok3[j], "/");
- state.addNodeInfo(
+ pendingTransition.addNodeInfo(
document::BucketId(16, atoi(tok4[0].c_str())),
BucketCopy(
timestamp,
@@ -1739,7 +1744,7 @@ parseInputData(const std::string& data,
atoi(tok4[2].c_str()),
atoi(tok4[3].c_str()))));
} else {
- state.addNodeInfo(
+ pendingTransition.addNodeInfo(
document::BucketId(16, atoi(tok3[j].c_str())),
BucketCopy(timestamp,
node,
@@ -1803,11 +1808,11 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, cmd, outdatedNodes,
+ clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes,
beforeTime));
parseInputData(existingData, beforeTime, *state, includeBucketInfo);
- state->mergeInto(getBucketDBUpdater().getDistributorComponent().getBucketDatabase());
+ state->mergeIntoBucketDatabases();
}
BucketDumper dumper_tmp(true);
@@ -1822,12 +1827,11 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString()));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, cmd, outdatedNodes,
+ clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes,
afterTime));
parseInputData(newData, afterTime, *state, includeBucketInfo);
- state->mergeInto(getBucketDBUpdater().getDistributorComponent()
- .getBucketDatabase());
+ state->mergeIntoBucketDatabases();
}
BucketDumper dumper(includeBucketInfo);
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 5deb31f8579..a0aa8e00070 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -358,6 +358,16 @@ DistributorTestUtil::getBucketDatabase() const {
return _distributor->getDefaultBucketSpace().getBucketDatabase();
}
+DistributorBucketSpaceRepo &
+DistributorTestUtil::getBucketSpaceRepo() {
+ return _distributor->getBucketSpaceRepo();
+}
+
+const DistributorBucketSpaceRepo &
+DistributorTestUtil::getBucketSpaceRepo() const {
+ return _distributor->getBucketSpaceRepo();
+}
+
const lib::Distribution&
DistributorTestUtil::getDistribution() const {
return _distributor->getDefaultBucketSpace().getDistribution();
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 4f09c11ac03..19da0483165 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -20,6 +20,7 @@ namespace distributor {
class BucketDBUpdater;
class Distributor;
class DistributorBucketSpace;
+class DistributorBucketSpaceRepo;
class IdealStateManager;
class ExternalOperationHandler;
class Operation;
@@ -125,6 +126,8 @@ public:
DistributorBucketSpace &getDistributorBucketSpace();
BucketDatabase& getBucketDatabase();
const BucketDatabase& getBucketDatabase() const;
+ DistributorBucketSpaceRepo &getBucketSpaceRepo();
+ const DistributorBucketSpaceRepo &getBucketSpaceRepo() const;
const lib::Distribution& getDistribution() const;
// "End to end" distribution change trigger, which will invoke the bucket
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 569136b8b10..50431bda37e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -169,6 +169,7 @@ BucketDBUpdater::storageDistributionChanged(
_bucketSpaceComponent.getClock(),
std::move(clusterInfo),
_sender,
+ _bucketSpaceComponent.getBucketSpaceRepo(),
_bucketSpaceComponent.getUniqueTimestamp());
_outdatedNodes = _pendingClusterState->getOutdatedNodeSet();
}
@@ -216,6 +217,7 @@ BucketDBUpdater::onSetSystemState(
_bucketSpaceComponent.getClock(),
std::move(clusterInfo),
_sender,
+ _bucketSpaceComponent.getBucketSpaceRepo(),
cmd,
_outdatedNodes,
_bucketSpaceComponent.getUniqueTimestamp());
@@ -486,7 +488,7 @@ BucketDBUpdater::isPendingClusterStateCompleted() const
void
BucketDBUpdater::processCompletedPendingClusterState()
{
- _pendingClusterState->mergeInto(_bucketSpaceComponent.getBucketDatabase());
+ _pendingClusterState->mergeIntoBucketDatabases();
if (_pendingClusterState->getCommand().get()) {
enableCurrentClusterStateInDistributor();
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index f59b47574ba..3cb3408a951 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -161,6 +161,8 @@ public:
DistributorBucketSpace& getDefaultBucketSpace() noexcept;
const DistributorBucketSpace& getDefaultBucketSpace() const noexcept;
+ DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; }
+ const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; }
private:
friend class Distributor_Test;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 1186cc23c5f..e8f9442c76f 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -3,6 +3,7 @@
#include "pendingclusterstate.h"
#include "pending_bucket_space_db_transition.h"
#include "bucketdbupdater.h"
+#include "distributor_bucket_space_repo.h"
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/vespalib/util/xmlstream.hpp>
@@ -23,6 +24,7 @@ PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
+ DistributorBucketSpaceRepo &bucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const std::unordered_set<uint16_t>& outdatedNodes,
api::Timestamp creationTimestamp)
@@ -35,8 +37,9 @@ PendingClusterState::PendingClusterState(
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
_sender(sender),
+ _bucketSpaceRepo(bucketSpaceRepo),
_bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)),
- _pendingTransition()
+ _pendingTransitions()
{
logConstructionInformation();
if (hasBucketOwnershipTransfer()) {
@@ -45,16 +48,14 @@ PendingClusterState::PendingClusterState(
updateSetOfNodesThatAreOutdated();
addAdditionalNodesToOutdatedSet(outdatedNodes);
}
- _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp);
- if (shouldRequestBucketInfo()) {
- requestNodes();
- }
+ initializeBucketSpaceTransitions();
}
PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
+ DistributorBucketSpaceRepo &bucketSpaceRepo,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
_outdatedNodes(clusterInfo->getStorageNodeCount()),
@@ -64,19 +65,28 @@ PendingClusterState::PendingClusterState(
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
_sender(sender),
+ _bucketSpaceRepo(bucketSpaceRepo),
_bucketOwnershipTransfer(true),
- _pendingTransition()
+ _pendingTransitions()
{
logConstructionInformation();
markAllAvailableNodesAsRequiringRequest();
- _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp);
+ initializeBucketSpaceTransitions();
+}
+
+PendingClusterState::~PendingClusterState() {}
+
+void
+PendingClusterState::initializeBucketSpaceTransitions()
+{
+ for (auto &elem : _bucketSpaceRepo) {
+ _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp));
+ }
if (shouldRequestBucketInfo()) {
requestNodes();
}
}
-PendingClusterState::~PendingClusterState() {}
-
void
PendingClusterState::logConstructionInformation() const
{
@@ -217,7 +227,9 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState()
{
for (uint16_t idx : _outdatedNodes) {
if (storageNodeUpInNewState(idx)) {
- requestNode(idx);
+ for (auto &elem : _bucketSpaceRepo) {
+ requestNode(BucketSpaceAndNode(elem.first, idx));
+ }
}
}
}
@@ -312,19 +324,20 @@ PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown(
}
void
-PendingClusterState::requestNode(uint16_t node)
+PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
{
vespalib::string distributionHash(_clusterInfo->getDistributionHash());
LOG(debug,
- "Requesting bucket info for node %d with cluster state '%s' "
+ "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' "
"and distribution hash '%s'",
- node,
+ bucketSpaceAndNode.bucketSpace.getId(),
+ bucketSpaceAndNode.node,
_newClusterState.toString().c_str(),
distributionHash.c_str());
std::shared_ptr<api::RequestBucketInfoCommand> cmd(
new api::RequestBucketInfoCommand(
- BucketSpace::placeHolder(),
+ bucketSpaceAndNode.bucketSpace,
_sender.getDistributorIndex(),
_newClusterState,
distributionHash));
@@ -332,9 +345,9 @@ PendingClusterState::requestNode(uint16_t node)
cmd->setPriority(api::StorageMessage::HIGH);
cmd->setTimeout(INT_MAX);
- _sentMessages[cmd->getMsgId()] = node;
+ _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode);
- _sender.sendToNode(NodeType::STORAGE, node, cmd);
+ _sender.sendToNode(NodeType::STORAGE, bucketSpaceAndNode.node, cmd);
}
@@ -358,18 +371,20 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
if (iter == _sentMessages.end()) {
return false;
}
- const uint16_t node = iter->second;
+ const BucketSpaceAndNode bucketSpaceAndNode = iter->second;
if (!reply->getResult().success()) {
framework::MilliSecTime resendTime(_clock);
resendTime += framework::MilliSecTime(100);
- _delayedRequests.push_back(std::make_pair(resendTime, node));
+ _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode);
_sentMessages.erase(iter);
return true;
}
- setNodeReplied(node);
- _pendingTransition->onRequestBucketInfoReply(*reply, node);
+ setNodeReplied(bucketSpaceAndNode.node);
+ auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace);
+ assert(transitionIter != _pendingTransitions.end());
+ transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node);
_sentMessages.erase(iter);
return true;
@@ -403,9 +418,11 @@ PendingClusterState::requestNodesToString() const
}
void
-PendingClusterState::mergeInto(BucketDatabase& db)
+PendingClusterState::mergeIntoBucketDatabases()
{
- _pendingTransition->mergeInto(db);
+ for (auto &elem : _bucketSpaceRepo) {
+ _pendingTransitions[elem.first]->mergeInto(elem.second->getBucketDatabase());
+ }
}
void
@@ -414,11 +431,9 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const
using namespace vespalib::xml;
xos << XmlTag("systemstate_pending")
<< XmlAttribute("state", _newClusterState);
- for (std::map<uint64_t, uint16_t>::const_iterator iter
- = _sentMessages.begin(); iter != _sentMessages.end(); ++iter)
- {
+ for (auto &elem : _sentMessages) {
xos << XmlTag("pending")
- << XmlAttribute("node", iter->second)
+ << XmlAttribute("node", elem.second.node)
<< XmlEndTag();
}
xos << XmlEndTag();
@@ -432,16 +447,12 @@ PendingClusterState::getSummary() const
(_clock.getTimeInMicros().getTime() - _creationTimestamp));
}
-const PendingBucketSpaceDbTransition::EntryList &
-PendingClusterState::results() const
-{
- return _pendingTransition->results();
-}
-
-void
-PendingClusterState::addNodeInfo(const document::BucketId& id, const BucketCopy& copy)
+PendingBucketSpaceDbTransition &
+PendingClusterState::getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace)
{
- _pendingTransition->addNodeInfo(id, copy);
+ auto transitionIter = _pendingTransitions.find(bucketSpace);
+ assert(transitionIter != _pendingTransitions.end());
+ return *transitionIter->second;
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index b742bd3bf46..ac5c4dc35ea 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -10,14 +10,14 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlserializable.h>
#include <unordered_set>
+#include <unordered_map>
#include <deque>
-namespace storage { class BucketDatabase; }
-
namespace storage::distributor {
class DistributorMessageSender;
class PendingBucketSpaceDbTransition;
+class DistributorBucketSpaceRepo;
/**
* Class used by BucketDBUpdater to track request bucket info
@@ -42,12 +42,13 @@ public:
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
+ DistributorBucketSpaceRepo &bucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const std::unordered_set<uint16_t>& outdatedNodes,
api::Timestamp creationTimestamp)
{
return std::unique_ptr<PendingClusterState>(
- new PendingClusterState(clock, clusterInfo, sender, newStateCmd,
+ new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd,
outdatedNodes,
creationTimestamp));
}
@@ -60,10 +61,11 @@ public:
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
+ DistributorBucketSpaceRepo &bucketSpaceRepo,
api::Timestamp creationTimestamp)
{
return std::unique_ptr<PendingClusterState>(
- new PendingClusterState(clock, clusterInfo, sender, creationTimestamp));
+ new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp));
}
PendingClusterState(const PendingClusterState &) = delete;
@@ -77,8 +79,8 @@ public:
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& reply);
/**
- * Tags the given node as having replied to the
- * request bucket info command.
+ * Tags the given node as having replied to at least one of the
+ * request bucket info commands. Only used for debug logging.
*/
void setNodeReplied(uint16_t nodeIdx) {
_requestedNodes[nodeIdx] = true;
@@ -123,14 +125,11 @@ public:
std::unordered_set<uint16_t> getOutdatedNodeSet() const;
/**
- * Merges all the results with the given bucket database.
+ * Merges all the results with the corresponding bucket databases.
*/
- void mergeInto(BucketDatabase& db);
- // Get our list of information. Only used by unit test.
- const std::vector<dbtransition::Entry>& results() const;
- // Adds info from a node to our list of information. Only used by unit test.
- void addNodeInfo(const document::BucketId& id, const BucketCopy& copy);
-
+ void mergeIntoBucketDatabases();
+ // Get pending transition for a specific bucket space. Only used by unit test.
+ PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace);
/**
* Returns true if this pending state was due to a distribution bit
@@ -150,6 +149,7 @@ private:
const framework::Clock&,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
+ DistributorBucketSpaceRepo &bucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const std::unordered_set<uint16_t>& outdatedNodes,
api::Timestamp creationTimestamp);
@@ -162,10 +162,23 @@ private:
const framework::Clock&,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
+ DistributorBucketSpaceRepo &bucketSpaceRepo,
api::Timestamp creationTimestamp);
+ struct BucketSpaceAndNode {
+ document::BucketSpace bucketSpace;
+ uint16_t node;
+ BucketSpaceAndNode(document::BucketSpace bucketSpace_,
+ uint16_t node_)
+ : bucketSpace(bucketSpace_),
+ node(node_)
+ {
+ }
+ };
+
+ void initializeBucketSpaceTransitions();
void logConstructionInformation() const;
- void requestNode(uint16_t node);
+ void requestNode(BucketSpaceAndNode bucketSpaceAndNode);
bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState);
bool storageNodeMayHaveLostData(uint16_t index);
bool storageNodeChanged(uint16_t index);
@@ -190,9 +203,9 @@ private:
std::shared_ptr<api::SetSystemStateCommand> _cmd;
- std::map<uint64_t, uint16_t> _sentMessages;
+ std::map<uint64_t, BucketSpaceAndNode> _sentMessages;
std::vector<bool> _requestedNodes;
- std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests;
+ std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests;
// Set for all nodes that may have changed state since that previous
// active cluster state, or that were marked as outdated when the pending
@@ -209,10 +222,11 @@ private:
api::Timestamp _creationTimestamp;
DistributorMessageSender& _sender;
+ DistributorBucketSpaceRepo &_bucketSpaceRepo;
bool _distributionChange;
bool _bucketOwnershipTransfer;
- std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition;
+ std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
};
}