summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2018-02-26 13:49:28 +0000
committerTor Egge <Tor.Egge@oath.com>2018-02-26 14:03:09 +0000
commitff1a5ed79d056a35b53ff9609638c87aac14a1f4 (patch)
treedd35c921a1a0f0d33c239fa3ff6e25a0b336d892
parent78d0a367ca60417bd5bb86de5e58375b69fd27dc (diff)
Enable cluster state bundle in distributor.
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp7
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp4
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp2
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp2
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h4
-rw-r--r--storage/src/vespa/storage/distributor/clusterinformation.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/clusterinformation.h5
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributorinterface.h7
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp42
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h20
-rw-r--r--storage/src/vespa/storage/distributor/simpleclusterinformation.h10
19 files changed, 109 insertions, 96 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 08262e30fe6..559afffc795 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -183,15 +183,16 @@ protected:
bool bucketExistsThatHasNode(int bucketCount, uint16_t node) const;
ClusterInformation::CSP createClusterInfo(const std::string& clusterStateString) {
- auto clusterState = std::make_shared<lib::ClusterState>(clusterStateString);
+ lib::ClusterState baselineClusterState(clusterStateString);
+ lib::ClusterStateBundle clusterStateBundle(baselineClusterState);
ClusterInformation::CSP clusterInfo(
new SimpleClusterInformation(
getBucketDBUpdater().getDistributorComponent().getIndex(),
- *clusterState,
+ clusterStateBundle,
"ui"));
auto &repo = getBucketSpaceRepo();
for (auto &elem : repo) {
- elem.second->setClusterState(clusterState);
+ elem.second->setClusterState(clusterStateBundle.getDerivedClusterState(elem.first));
}
return clusterInfo;
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index fbf5a14c052..8310266c9cb 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -240,7 +240,7 @@ DistributorTestUtil::removeFromBucketDB(const document::BucketId& id)
void
DistributorTestUtil::addIdealNodes(const document::BucketId& id)
{
- addIdealNodes(getExternalOperationHandler().getClusterState(), id);
+ addIdealNodes(*getExternalOperationHandler().getClusterStateBundle().getBaselineClusterState(), id);
}
void
@@ -389,7 +389,7 @@ DistributorTestUtil::getBucketSpaces() const
void
DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state)
{
- _distributor->enableClusterState(lib::ClusterState(state));
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(lib::ClusterState(state)));
}
}
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 7103a89229d..7401e083900 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -49,7 +49,7 @@ public:
void testBlockCheckForAllOperationsToSpecificBucket();
void setSystemState(const lib::ClusterState& systemState) {
- _distributor->enableClusterState(systemState);
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
}
CPPUNIT_TEST_SUITE(IdealStateManagerTest);
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index b339d1b4601..c265a0972af 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -93,7 +93,7 @@ struct StateCheckersTest : public CppUnit::TestFixture,
void statsUpdatedWhenMergingDueToOutOfSyncCopies();
void enableClusterState(const lib::ClusterState& systemState) {
- _distributor->enableClusterState(systemState);
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
}
void insertJoinableBuckets();
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index b6afcd4f3ab..af580480563 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -1095,7 +1095,7 @@ void
VisitorOperationTest::testVisitIdealNode()
{
ClusterState state("distributor:1 storage:3");
- _distributor->enableClusterState(state);
+ _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state));
// Create buckets in bucketdb
for (int i=0; i<32; i++ ) {
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 36b48a545f1..84332851340 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -63,7 +63,7 @@ BucketOwnership
BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const
{
if (hasPendingClusterState()) {
- const lib::ClusterState& state(_pendingClusterState->getNewClusterState());
+ const lib::ClusterState& state(*_pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(b.getBucketSpace()));
if (!_distributorComponent.ownsBucketInState(state, b)) {
return BucketOwnership::createNotOwnedInState(state);
}
@@ -77,7 +77,7 @@ BucketDBUpdater::sendRequestBucketInfo(
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
- if (!_distributorComponent.storageNodeIsUp(node)) {
+ if (!_distributorComponent.storageNodeIsUp(bucket.getBucketSpace(), node)) {
return;
}
@@ -112,7 +112,7 @@ BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
void
BucketDBUpdater::removeSuperfluousBuckets(
- const lib::ClusterState& newState)
+ const lib::ClusterStateBundle& newState)
{
for (auto &elem : _distributorComponent.getBucketSpaceRepo()) {
const auto &newDistribution(elem.second->getDistribution());
@@ -123,7 +123,7 @@ BucketDBUpdater::removeSuperfluousBuckets(
// being on storage nodes that are no longer up.
NodeRemover proc(
oldClusterState,
- newState,
+ *newState.getDerivedClusterState(elem.first),
_distributorComponent.getBucketIdFactory(),
_distributorComponent.getIndex(),
newDistribution,
@@ -159,11 +159,11 @@ BucketDBUpdater::storageDistributionChanged()
{
ensureTransitionTimerStarted();
- removeSuperfluousBuckets(_distributorComponent.getClusterState());
+ removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle());
ClusterInformation::CSP clusterInfo(new SimpleClusterInformation(
_distributorComponent.getIndex(),
- _distributorComponent.getClusterState(),
+ _distributorComponent.getClusterStateBundle(),
_distributorComponent.getDistributor().getStorageNodeUpStates()));
_pendingClusterState = PendingClusterState::createForDistributionChange(
_distributorComponent.getClock(),
@@ -193,21 +193,21 @@ BucketDBUpdater::onSetSystemState(
"Received new cluster state %s",
cmd->getSystemState().toString().c_str());
- lib::ClusterState oldState = _distributorComponent.getClusterState();
- const lib::ClusterState& state = cmd->getSystemState();
+ const lib::ClusterStateBundle oldState = _distributorComponent.getClusterStateBundle();
+ const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
if (state == oldState) {
return false;
}
ensureTransitionTimerStarted();
- removeSuperfluousBuckets(cmd->getSystemState());
+ removeSuperfluousBuckets(cmd->getClusterStateBundle());
replyToPreviousPendingClusterStateIfAny();
ClusterInformation::CSP clusterInfo(
new SimpleClusterInformation(
_distributorComponent.getIndex(),
- _distributorComponent.getClusterState(),
+ _distributorComponent.getClusterStateBundle(),
_distributorComponent.getDistributor()
.getStorageNodeUpStates()));
_pendingClusterState = PendingClusterState::createForClusterStateChange(
@@ -423,7 +423,7 @@ BucketDBUpdater::processSingleBucketInfoReply(
BucketRequest req = iter->second;
_sentMessages.erase(iter);
- if (!_distributorComponent.storageNodeIsUp(req.targetNode)) {
+ if (!_distributorComponent.storageNodeIsUp(req.bucket.getBucketSpace(), req.targetNode)) {
// Ignore replies from nodes that are down.
return true;
}
@@ -489,7 +489,7 @@ BucketDBUpdater::processCompletedPendingClusterState()
_pendingClusterState->mergeIntoBucketDatabases();
if (_pendingClusterState->getCommand().get()) {
- enableCurrentClusterStateInDistributor();
+ enableCurrentClusterStateBundleInDistributor();
_distributorComponent.getDistributor().getMessageSender().sendDown(
_pendingClusterState->getCommand());
addCurrentStateToClusterStateHistory();
@@ -504,16 +504,16 @@ BucketDBUpdater::processCompletedPendingClusterState()
}
void
-BucketDBUpdater::enableCurrentClusterStateInDistributor()
+BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
{
- const lib::ClusterState& state(
- _pendingClusterState->getCommand()->getSystemState());
+ const lib::ClusterStateBundle& state(
+ _pendingClusterState->getCommand()->getClusterStateBundle());
LOG(debug,
"BucketDBUpdater finished processing state %s",
- state.toString().c_str());
+ state.getBaselineClusterState()->toString().c_str());
- _distributorComponent.getDistributor().enableClusterState(state);
+ _distributorComponent.getDistributor().enableClusterStateBundle(state);
}
void
@@ -564,7 +564,7 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
using namespace vespalib::xml;
xos << XmlTag("bucketdb")
<< XmlTag("systemstate_active")
- << XmlContent(_distributorComponent.getClusterState().toString())
+ << XmlContent(_distributorComponent.getClusterStateBundle().getBaselineClusterState()->toString())
<< XmlEndTag();
if (_pendingClusterState) {
xos << *_pendingClusterState;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 19e2e259778..a85ee6fe4f7 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -152,11 +152,11 @@ private:
void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
- void removeSuperfluousBuckets(const lib::ClusterState& newState);
+ void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState);
void replyToPreviousPendingClusterStateIfAny();
- void enableCurrentClusterStateInDistributor();
+ void enableCurrentClusterStateBundleInDistributor();
void addCurrentStateToClusterStateHistory();
void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
void sendAllQueuedBucketRechecks();
diff --git a/storage/src/vespa/storage/distributor/clusterinformation.cpp b/storage/src/vespa/storage/distributor/clusterinformation.cpp
index cd09e4f46d4..96e94c92819 100644
--- a/storage/src/vespa/storage/distributor/clusterinformation.cpp
+++ b/storage/src/vespa/storage/distributor/clusterinformation.cpp
@@ -2,6 +2,7 @@
#include "clusterinformation.h"
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vdslib/state/clusterstate.h>
namespace storage::distributor {
@@ -9,7 +10,7 @@ namespace storage::distributor {
uint16_t
ClusterInformation::getStorageNodeCount() const
{
- return getClusterState().getNodeCount(lib::NodeType::STORAGE);
+ return getClusterStateBundle().getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
}
}
diff --git a/storage/src/vespa/storage/distributor/clusterinformation.h b/storage/src/vespa/storage/distributor/clusterinformation.h
index 25f303d0f52..49abb5e8e75 100644
--- a/storage/src/vespa/storage/distributor/clusterinformation.h
+++ b/storage/src/vespa/storage/distributor/clusterinformation.h
@@ -10,8 +10,7 @@ namespace storage {
namespace lib {
-class Distribution;
-class ClusterState;
+class ClusterStateBundle;
}
@@ -26,7 +25,7 @@ public:
virtual uint16_t getDistributorIndex() const = 0;
- virtual const lib::ClusterState& getClusterState() const = 0;
+ virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0;
virtual const char* getStorageUpStates() const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index fa6d63da32e..86a8ac46cbb 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -63,6 +63,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
: StorageLink("distributor"),
DistributorInterface(),
framework::StatusReporter("distributor", "Distributor"),
+ _clusterStateBundle(lib::ClusterState()),
_compReg(compReg),
_component(compReg, "distributor"),
_bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()),
@@ -332,17 +333,24 @@ Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg)
return false;
}
+const lib::ClusterStateBundle&
+Distributor::getClusterStateBundle() const
+{
+ return _clusterStateBundle;
+}
+
void
-Distributor::enableClusterState(const lib::ClusterState& state)
+Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
{
- lib::ClusterState oldState = _clusterState;
- _clusterState = state;
+ lib::ClusterStateBundle oldState = _clusterStateBundle;
+ _clusterStateBundle = state;
propagateClusterStates();
lib::Node myNode(lib::NodeType::DISTRIBUTOR, _component.getIndex());
+ const auto &baselineState = *_clusterStateBundle.getBaselineClusterState();
if (!_doneInitializing &&
- getClusterState().getNodeState(myNode).getState() == lib::State::UP)
+ baselineState.getNodeState(myNode).getState() == lib::State::UP)
{
scanAllBuckets();
_doneInitializing = true;
@@ -352,8 +360,8 @@ Distributor::enableClusterState(const lib::ClusterState& state)
}
// Clear all active messages on nodes that are down.
- for (uint16_t i = 0; i < state.getNodeCount(lib::NodeType::STORAGE); ++i) {
- if (!state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState()
+ for (uint16_t i = 0; i < baselineState.getNodeCount(lib::NodeType::STORAGE); ++i) {
+ if (!baselineState.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState()
.oneOf(getStorageNodeUpStates()))
{
std::vector<uint64_t> msgIds(
@@ -537,9 +545,8 @@ Distributor::propagateDefaultDistribution(
void
Distributor::propagateClusterStates()
{
- auto clusterState = std::make_shared<lib::ClusterState>(_clusterState);
for (auto &iter : *_bucketSpaceRepo) {
- iter.second->setClusterState(clusterState);
+ iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first));
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 9ec21d6ab05..e28c6dd6578 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -78,7 +78,7 @@ public:
* Enables a new cluster state. Called after the bucket db updater has
* retrieved all bucket info related to the change.
*/
- void enableClusterState(const lib::ClusterState& clusterState) override;
+ void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override;
/**
* Invoked when a pending cluster state for a distribution (config)
@@ -114,9 +114,7 @@ public:
*/
void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t priority) override;
- const lib::ClusterState& getClusterState() const override {
- return _clusterState;
- }
+ const lib::ClusterStateBundle& getClusterStateBundle() const override;
/**
* @return Returns the states in which the distributors consider
@@ -235,7 +233,7 @@ private:
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
void propagateClusterStates();
- lib::ClusterState _clusterState;
+ lib::ClusterStateBundle _clusterStateBundle;
DistributorComponentRegister& _compReg;
storage::DistributorComponent _component;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index 8fa412ea38b..1d2465fb41a 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -3,6 +3,7 @@
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/messageapi/storagereply.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
@@ -40,10 +41,10 @@ DistributorComponent::sendUp(const api::StorageMessage::SP& msg)
_distributor.getMessageSender().sendUp(msg);
}
-const lib::ClusterState&
-DistributorComponent::getClusterState() const
+const lib::ClusterStateBundle&
+DistributorComponent::getClusterStateBundle() const
{
- return _distributor.getClusterState();
+ return _distributor.getClusterStateBundle();
};
std::vector<uint16_t>
@@ -305,9 +306,9 @@ DistributorComponent::getBucketId(const document::DocumentId& docId) const
}
bool
-DistributorComponent::storageNodeIsUp(uint32_t nodeIndex) const
+DistributorComponent::storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const
{
- const lib::NodeState& ns = getClusterState().getNodeState(
+ const lib::NodeState& ns = getClusterStateBundle().getDerivedClusterState(bucketSpace)->getNodeState(
lib::Node(lib::NodeType::STORAGE, nodeIndex));
return ns.getState().oneOf(_distributor.getStorageNodeUpStates());
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index 33e86d423e7..184ac768afb 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -68,10 +68,10 @@ public:
bool ownsBucketInCurrentState(const document::Bucket &bucket) const;
/**
- * Returns a reference to the current system state. Valid until the next
- * time the distributor main thread processes its message queue.
+ * Returns a reference to the current cluster state bundle. Valid until the
+ * next time the distributor main thread processes its message queue.
*/
- const lib::ClusterState& getClusterState() const;
+ const lib::ClusterStateBundle& getClusterStateBundle() const;
/**
* Returns the ideal nodes for the given bucket.
@@ -86,7 +86,7 @@ public:
/**
* Returns true if the given storage node is in an "up state".
*/
- bool storageNodeIsUp(uint32_t nodeIndex) const;
+ bool storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const;
/**
* Verifies that the given command has been received at the
diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h
index 3445397c17d..17c300fa0a9 100644
--- a/storage/src/vespa/storage/distributor/distributorinterface.h
+++ b/storage/src/vespa/storage/distributor/distributorinterface.h
@@ -8,6 +8,7 @@
#include <vespa/document/bucket/bucket.h>
namespace storage::api { class MergeBucketReply; }
+namespace storage::lib { class ClusterStateBundle; }
namespace storage {
class DistributorConfiguration;
class DistributorMetricSet;
@@ -21,7 +22,7 @@ class DistributorInterface : public DistributorMessageSender
public:
virtual PendingMessageTracker& getPendingMessageTracker() = 0;
virtual DistributorMetricSet& getMetrics() = 0;
- virtual void enableClusterState(const lib::ClusterState& state) = 0;
+ virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0;
virtual BucketOwnership checkOwnershipInPendingState(const document::Bucket &bucket) const = 0;
virtual void notifyDistributionChangeEnabled() = 0;
@@ -43,9 +44,9 @@ public:
virtual void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t pri) = 0;
/**
- * @return Returns the current cluster state.
+ * @return Returns the current cluster state bundle.
*/
- virtual const lib::ClusterState& getClusterState() const = 0;
+ virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0;
/**
* Returns true if the node is currently initializing.
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 4018bb88583..773014391fd 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -68,9 +68,10 @@ bool
IdealStateManager::iAmUp() const
{
Node node(NodeType::DISTRIBUTOR, _distributorComponent.getIndex());
- const lib::State &nodeState = _distributorComponent.getClusterState()
- .getNodeState(node).getState();
- const lib::State &clusterState = _distributorComponent.getClusterState().getClusterState();
+ // Assume that derived cluster states agree on distributor node being up
+ const auto &state = *_distributorComponent.getClusterStateBundle().getBaselineClusterState();
+ const lib::State &nodeState = state.getNodeState(node).getState();
+ const lib::State &clusterState = state.getClusterState();
return (nodeState == lib::State::UP && clusterState == lib::State::UP);
}
@@ -278,7 +279,7 @@ void IdealStateManager::dump_bucket_space_db_status(document::BucketSpace bucket
void IdealStateManager::getBucketStatus(std::ostream& out) const {
LOG(debug, "Dumping bucket database valid at cluster state version %u",
- _distributorComponent.getDistributor().getClusterState().getVersion());
+ _distributorComponent.getDistributor().getClusterStateBundle().getVersion());
for (auto& space : _bucketSpaceRepo) {
dump_bucket_space_db_status(space.first, out);
diff --git a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp
index 23e1081f9ae..1ea077fd1c1 100644
--- a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp
@@ -119,7 +119,7 @@ MultiOperationOperation::onStart(DistributorMessageSender& sender)
// Don't do anything if all nodes are down.
bool up = false;
for (uint16_t i = 0; i < systemState.getNodeCount(lib::NodeType::STORAGE); i++) {
- if (_manager.storageNodeIsUp(i)) {
+ if (_manager.storageNodeIsUp(_msg->getBucket().getBucketSpace(), i)) {
up = true;
break;
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 3c96bc55161..3fd8b53f132 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -32,8 +32,8 @@ PendingClusterState::PendingClusterState(
api::Timestamp creationTimestamp)
: _cmd(newStateCmd),
_requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)),
- _prevClusterState(clusterInfo->getClusterState()),
- _newClusterState(newStateCmd->getSystemState()),
+ _prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
+ _newClusterStateBundle(newStateCmd->getSystemState()),
_clock(clock),
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
@@ -53,8 +53,8 @@ PendingClusterState::PendingClusterState(
DistributorBucketSpaceRepo &bucketSpaceRepo,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
- _prevClusterState(clusterInfo->getClusterState()),
- _newClusterState(clusterInfo->getClusterState()),
+ _prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
+ _newClusterStateBundle(clusterInfo->getClusterStateBundle()),
_clock(clock),
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
@@ -79,7 +79,7 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged,
auto pendingTransition =
std::make_unique<PendingBucketSpaceDbTransition>
(*this, *elem.second, distributionChanged, outdatedNodes,
- _clusterInfo, _newClusterState, _creationTimestamp);
+ _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp);
if (pendingTransition->getBucketOwnershipTransfer()) {
_bucketOwnershipTransfer = true;
}
@@ -99,15 +99,15 @@ PendingClusterState::logConstructionInformation() const
"New PendingClusterState constructed with previous cluster "
"state '%s', new cluster state '%s', distribution config "
"hash: '%s'",
- _prevClusterState.toString().c_str(),
- _newClusterState.toString().c_str(),
+ getPrevClusterStateBundleString().c_str(),
+ getNewClusterStateBundleString().c_str(),
distribution.getNodeGraph().getDistributionConfigHash().c_str());
}
bool
-PendingClusterState::storageNodeUpInNewState(uint16_t node) const
+PendingClusterState::storageNodeUpInNewState(document::BucketSpace bucketSpace, uint16_t node) const
{
- return _newClusterState.getNodeState(Node(NodeType::STORAGE, node))
+ return _newClusterStateBundle.getDerivedClusterState(bucketSpace)->getNodeState(Node(NodeType::STORAGE, node))
.getState().oneOf(_clusterInfo->getStorageUpStates());
}
@@ -124,7 +124,7 @@ PendingClusterState::getOutdatedNodesMap() const
uint16_t
PendingClusterState::newStateStorageNodeCount() const
{
- return _newClusterState.getNodeCount(lib::NodeType::STORAGE);
+ return _newClusterStateBundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
}
bool
@@ -144,15 +144,15 @@ PendingClusterState::shouldRequestBucketInfo() const
bool
PendingClusterState::clusterIsDown() const
{
- return _newClusterState.getClusterState() == lib::State::DOWN;
+ return _newClusterStateBundle.getBaselineClusterState()->getClusterState() == lib::State::DOWN;
}
bool
PendingClusterState::iAmDown() const
{
const lib::NodeState& myState(
- _newClusterState.getNodeState(Node(NodeType::DISTRIBUTOR,
- _sender.getDistributorIndex())));
+ _newClusterStateBundle.getBaselineClusterState()->getNodeState(Node(NodeType::DISTRIBUTOR,
+ _sender.getDistributorIndex())));
return myState.getState() == lib::State::DOWN;
}
@@ -161,8 +161,8 @@ PendingClusterState::requestNodes()
{
LOG(debug,
"New system state: Old state was %s, new state is %s",
- _prevClusterState.toString().c_str(),
- _newClusterState.toString().c_str());
+ getPrevClusterStateBundleString().c_str(),
+ getNewClusterStateBundleString().c_str());
requestBucketInfoFromStorageNodesWithChangedState();
}
@@ -173,7 +173,7 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState()
for (auto &elem : _pendingTransitions) {
const OutdatedNodes &outdatedNodes(elem.second->getOutdatedNodes());
for (uint16_t idx : outdatedNodes) {
- if (storageNodeUpInNewState(idx)) {
+ if (storageNodeUpInNewState(elem.first, idx)) {
requestNode(BucketSpaceAndNode(elem.first, idx));
}
}
@@ -191,14 +191,14 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
"and distribution hash '%s'",
bucketSpaceAndNode.bucketSpace.getId(),
bucketSpaceAndNode.node,
- _newClusterState.toString().c_str(),
+ getNewClusterStateBundleString().c_str(),
distributionHash.c_str());
std::shared_ptr<api::RequestBucketInfoCommand> cmd(
new api::RequestBucketInfoCommand(
bucketSpaceAndNode.bucketSpace,
_sender.getDistributorIndex(),
- _newClusterState,
+ *_newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace),
distributionHash));
cmd->setPriority(api::StorageMessage::HIGH);
@@ -294,7 +294,7 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const
{
using namespace vespalib::xml;
xos << XmlTag("systemstate_pending")
- << XmlAttribute("state", _newClusterState);
+ << XmlAttribute("state", *_newClusterStateBundle.getBaselineClusterState());
for (auto &elem : _sentMessages) {
xos << XmlTag("pending")
<< XmlAttribute("node", elem.second.node)
@@ -306,8 +306,8 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const
PendingClusterState::Summary
PendingClusterState::getSummary() const
{
- return Summary(_prevClusterState.toString(),
- _newClusterState.toString(),
+ return Summary(getPrevClusterStateBundleString(),
+ getNewClusterStateBundleString(),
(_clock.getTimeInMicros().getTime() - _creationTimestamp));
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 2d75c795745..b96ba8cbbd7 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -8,6 +8,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/clock.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vespalib/util/xmlserializable.h>
#include "outdated_nodes_map.h"
#include <unordered_map>
@@ -107,11 +108,8 @@ public:
return _cmd;
}
- const lib::ClusterState& getNewClusterState() const {
- return _newClusterState;
- }
- const lib::ClusterState& getPrevClusterState() const {
- return _prevClusterState;
+ const lib::ClusterStateBundle& getNewClusterStateBundle() const {
+ return _newClusterStateBundle;
}
/**
@@ -184,7 +182,13 @@ private:
bool clusterIsDown() const;
bool iAmDown() const;
- bool storageNodeUpInNewState(uint16_t node) const;
+ bool storageNodeUpInNewState(document::BucketSpace bucketSpace, uint16_t node) const;
+ std::string getNewClusterStateBundleString() const {
+ return _newClusterStateBundle.getBaselineClusterState()->toString();
+ }
+ std::string getPrevClusterStateBundleString() const {
+ return _prevClusterStateBundle.getBaselineClusterState()->toString();
+ }
std::shared_ptr<api::SetSystemStateCommand> _cmd;
@@ -192,8 +196,8 @@ private:
std::vector<bool> _requestedNodes;
std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests;
- lib::ClusterState _prevClusterState;
- lib::ClusterState _newClusterState;
+ lib::ClusterStateBundle _prevClusterStateBundle;
+ lib::ClusterStateBundle _newClusterStateBundle;
const framework::Clock& _clock;
ClusterInformation::CSP _clusterInfo;
diff --git a/storage/src/vespa/storage/distributor/simpleclusterinformation.h b/storage/src/vespa/storage/distributor/simpleclusterinformation.h
index 2946abf620c..1247d425e50 100644
--- a/storage/src/vespa/storage/distributor/simpleclusterinformation.h
+++ b/storage/src/vespa/storage/distributor/simpleclusterinformation.h
@@ -11,10 +11,10 @@ class SimpleClusterInformation : public ClusterInformation
{
public:
SimpleClusterInformation(uint16_t myIndex,
- const lib::ClusterState& clusterState,
+ const lib::ClusterStateBundle& clusterStateBundle,
const char* storageUpStates)
: _myIndex(myIndex),
- _clusterState(clusterState),
+ _clusterStateBundle(clusterStateBundle),
_storageUpStates(storageUpStates)
{}
@@ -22,8 +22,8 @@ public:
return _myIndex;
}
- const lib::ClusterState& getClusterState() const override {
- return _clusterState;
+ const lib::ClusterStateBundle& getClusterStateBundle() const override {
+ return _clusterStateBundle;
}
const char* getStorageUpStates() const override {
@@ -32,7 +32,7 @@ public:
private:
uint16_t _myIndex;
- lib::ClusterState _clusterState;
+ lib::ClusterStateBundle _clusterStateBundle;
const char* _storageUpStates;
};