summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-08 13:51:05 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-08 14:41:04 +0000
commit7a0ffc114735b225c9fb9c9d9941313c5a29c725 (patch)
treebcfcb68c1d330206e7a71de6f162f1d5ac142329
parent4e79e473c7fc914d709bbe6d2af56fd1fb9ccb83 (diff)
Change pending cluster state to handle more than one bucket space.
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp47
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h20
2 files changed, 43 insertions, 24 deletions
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 256d0fd407f..a78b66265bd 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -39,7 +39,7 @@ PendingClusterState::PendingClusterState(
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
_bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)),
- _pendingTransition()
+ _pendingTransitions()
{
logConstructionInformation();
if (hasBucketOwnershipTransfer()) {
@@ -67,7 +67,7 @@ PendingClusterState::PendingClusterState(
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
_bucketOwnershipTransfer(true),
- _pendingTransition()
+ _pendingTransitions()
{
logConstructionInformation();
markAllAvailableNodesAsRequiringRequest();
@@ -79,7 +79,9 @@ PendingClusterState::~PendingClusterState() {}
void
PendingClusterState::constructorHelper()
{
- _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp);
+ for (auto &elem : _bucketSpaceRepo) {
+ _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp));
+ }
if (shouldRequestBucketInfo()) {
requestNodes();
}
@@ -225,7 +227,9 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState()
{
for (uint16_t idx : _outdatedNodes) {
if (storageNodeUpInNewState(idx)) {
- requestNode(idx);
+ for (auto &elem : _bucketSpaceRepo) {
+ requestNode(BucketSpaceAndNode(elem.first, idx));
+ }
}
}
}
@@ -318,19 +322,19 @@ PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown(
}
void
-PendingClusterState::requestNode(uint16_t node)
+PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
{
vespalib::string distributionHash(_clusterInfo->getDistributionHash());
LOG(debug,
"Requesting bucket info for node %d with cluster state '%s' "
"and distribution hash '%s'",
- node,
+ bucketSpaceAndNode.node,
_newClusterState.toString().c_str(),
distributionHash.c_str());
std::shared_ptr<api::RequestBucketInfoCommand> cmd(
new api::RequestBucketInfoCommand(
- BucketSpace::placeHolder(),
+ bucketSpaceAndNode.bucketSpace,
_sender.getDistributorIndex(),
_newClusterState,
distributionHash));
@@ -338,9 +342,9 @@ PendingClusterState::requestNode(uint16_t node)
cmd->setPriority(api::StorageMessage::HIGH);
cmd->setTimeout(INT_MAX);
- _sentMessages[cmd->getMsgId()] = node;
+ _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode);
- _sender.sendToNode(NodeType::STORAGE, node, cmd);
+ _sender.sendToNode(NodeType::STORAGE, bucketSpaceAndNode.node, cmd);
}
@@ -364,18 +368,20 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
if (iter == _sentMessages.end()) {
return false;
}
- const uint16_t node = iter->second;
+ const BucketSpaceAndNode bucketSpaceAndNode = iter->second;
if (!reply->getResult().success()) {
framework::MilliSecTime resendTime(_clock);
resendTime += framework::MilliSecTime(100);
- _delayedRequests.push_back(std::make_pair(resendTime, node));
+ _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode);
_sentMessages.erase(iter);
return true;
}
- setNodeReplied(node);
- _pendingTransition->onRequestBucketInfoReply(*reply, node);
+ setNodeReplied(bucketSpaceAndNode.node);
+ auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace);
+ assert(transitionIter != _pendingTransitions.end());
+ transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node);
_sentMessages.erase(iter);
return true;
@@ -411,7 +417,9 @@ PendingClusterState::requestNodesToString() const
void
PendingClusterState::mergeIntoBucketDatabases()
{
- _pendingTransition->mergeInto(_bucketSpaceRepo.get(BucketSpace::placeHolder()).getBucketDatabase());
+ for (auto &elem : _bucketSpaceRepo) {
+ _pendingTransitions[elem.first]->mergeInto(elem.second->getBucketDatabase());
+ }
}
void
@@ -420,11 +428,9 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const
using namespace vespalib::xml;
xos << XmlTag("systemstate_pending")
<< XmlAttribute("state", _newClusterState);
- for (std::map<uint64_t, uint16_t>::const_iterator iter
- = _sentMessages.begin(); iter != _sentMessages.end(); ++iter)
- {
+ for (auto &elem : _sentMessages) {
xos << XmlTag("pending")
- << XmlAttribute("node", iter->second)
+ << XmlAttribute("node", elem.second.node)
<< XmlEndTag();
}
xos << XmlEndTag();
@@ -441,8 +447,9 @@ PendingClusterState::getSummary() const
PendingBucketSpaceDbTransition &
PendingClusterState::getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace)
{
- (void) bucketSpace;
- return *_pendingTransition;
+ auto transitionIter = _pendingTransitions.find(bucketSpace);
+ assert(transitionIter != _pendingTransitions.end());
+ return *transitionIter->second;
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 62b4f8c2b5a..3deed06342e 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -10,6 +10,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlserializable.h>
#include <unordered_set>
+#include <unordered_map>
#include <deque>
namespace storage::distributor {
@@ -164,9 +165,20 @@ private:
DistributorBucketSpaceRepo &bucketSpaceRepo,
api::Timestamp creationTimestamp);
+ struct BucketSpaceAndNode {
+ document::BucketSpace bucketSpace;
+ uint16_t node;
+ BucketSpaceAndNode(document::BucketSpace bucketSpace_,
+ uint16_t node_)
+ : bucketSpace(bucketSpace_),
+ node(node_)
+ {
+ }
+ };
+
void constructorHelper();
void logConstructionInformation() const;
- void requestNode(uint16_t node);
+ void requestNode(BucketSpaceAndNode bucketSpaceAndNode);
bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState);
bool storageNodeMayHaveLostData(uint16_t index);
bool storageNodeChanged(uint16_t index);
@@ -191,9 +203,9 @@ private:
std::shared_ptr<api::SetSystemStateCommand> _cmd;
- std::map<uint64_t, uint16_t> _sentMessages;
+ std::map<uint64_t, BucketSpaceAndNode> _sentMessages;
std::vector<bool> _requestedNodes;
- std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests;
+ std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests;
// Set for all nodes that may have changed state since that previous
// active cluster state, or that were marked as outdated when the pending
@@ -214,7 +226,7 @@ private:
bool _distributionChange;
bool _bucketOwnershipTransfer;
- std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition;
+ std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
};
}