diff options
author | Tor Egge <Tor.Egge@oath.com> | 2018-02-26 13:49:28 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2018-02-26 14:03:09 +0000 |
commit | ff1a5ed79d056a35b53ff9609638c87aac14a1f4 (patch) | |
tree | dd35c921a1a0f0d33c239fa3ff6e25a0b336d892 | |
parent | 78d0a367ca60417bd5bb86de5e58375b69fd27dc (diff) |
Enable cluster state bundle in distributor.
19 files changed, 109 insertions, 96 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 08262e30fe6..559afffc795 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -183,15 +183,16 @@ protected: bool bucketExistsThatHasNode(int bucketCount, uint16_t node) const; ClusterInformation::CSP createClusterInfo(const std::string& clusterStateString) { - auto clusterState = std::make_shared<lib::ClusterState>(clusterStateString); + lib::ClusterState baselineClusterState(clusterStateString); + lib::ClusterStateBundle clusterStateBundle(baselineClusterState); ClusterInformation::CSP clusterInfo( new SimpleClusterInformation( getBucketDBUpdater().getDistributorComponent().getIndex(), - *clusterState, + clusterStateBundle, "ui")); auto &repo = getBucketSpaceRepo(); for (auto &elem : repo) { - elem.second->setClusterState(clusterState); + elem.second->setClusterState(clusterStateBundle.getDerivedClusterState(elem.first)); } return clusterInfo; } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index fbf5a14c052..8310266c9cb 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -240,7 +240,7 @@ DistributorTestUtil::removeFromBucketDB(const document::BucketId& id) void DistributorTestUtil::addIdealNodes(const document::BucketId& id) { - addIdealNodes(getExternalOperationHandler().getClusterState(), id); + addIdealNodes(*getExternalOperationHandler().getClusterStateBundle().getBaselineClusterState(), id); } void @@ -389,7 +389,7 @@ DistributorTestUtil::getBucketSpaces() const void DistributorTestUtil::enableDistributorClusterState(vespalib::stringref state) { - _distributor->enableClusterState(lib::ClusterState(state)); + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(lib::ClusterState(state))); } } diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 7103a89229d..7401e083900 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -49,7 +49,7 @@ public: void testBlockCheckForAllOperationsToSpecificBucket(); void setSystemState(const lib::ClusterState& systemState) { - _distributor->enableClusterState(systemState); + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); } CPPUNIT_TEST_SUITE(IdealStateManagerTest); diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index b339d1b4601..c265a0972af 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -93,7 +93,7 @@ struct StateCheckersTest : public CppUnit::TestFixture, void statsUpdatedWhenMergingDueToOutOfSyncCopies(); void enableClusterState(const lib::ClusterState& systemState) { - _distributor->enableClusterState(systemState); + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); } void insertJoinableBuckets(); diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index b6afcd4f3ab..af580480563 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -1095,7 +1095,7 @@ void VisitorOperationTest::testVisitIdealNode() { ClusterState state("distributor:1 storage:3"); - _distributor->enableClusterState(state); + _distributor->enableClusterStateBundle(lib::ClusterStateBundle(state)); // Create buckets in bucketdb for (int i=0; i<32; i++ ) { diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 36b48a545f1..84332851340 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -63,7 +63,7 @@ BucketOwnership BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const { if (hasPendingClusterState()) { - const lib::ClusterState& state(_pendingClusterState->getNewClusterState()); + const lib::ClusterState& state(*_pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(b.getBucketSpace())); if (!_distributorComponent.ownsBucketInState(state, b)) { return BucketOwnership::createNotOwnedInState(state); } @@ -77,7 +77,7 @@ BucketDBUpdater::sendRequestBucketInfo( const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) { - if (!_distributorComponent.storageNodeIsUp(node)) { + if (!_distributorComponent.storageNodeIsUp(bucket.getBucketSpace(), node)) { return; } @@ -112,7 +112,7 @@ BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx, void BucketDBUpdater::removeSuperfluousBuckets( - const lib::ClusterState& newState) + const lib::ClusterStateBundle& newState) { for (auto &elem : _distributorComponent.getBucketSpaceRepo()) { const auto &newDistribution(elem.second->getDistribution()); @@ -123,7 +123,7 @@ BucketDBUpdater::removeSuperfluousBuckets( // being on storage nodes that are no longer up. NodeRemover proc( oldClusterState, - newState, + *newState.getDerivedClusterState(elem.first), _distributorComponent.getBucketIdFactory(), _distributorComponent.getIndex(), newDistribution, @@ -159,11 +159,11 @@ BucketDBUpdater::storageDistributionChanged() { ensureTransitionTimerStarted(); - removeSuperfluousBuckets(_distributorComponent.getClusterState()); + removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle()); ClusterInformation::CSP clusterInfo(new SimpleClusterInformation( _distributorComponent.getIndex(), - _distributorComponent.getClusterState(), + _distributorComponent.getClusterStateBundle(), _distributorComponent.getDistributor().getStorageNodeUpStates())); _pendingClusterState = PendingClusterState::createForDistributionChange( _distributorComponent.getClock(), @@ -193,21 +193,21 @@ BucketDBUpdater::onSetSystemState( "Received new cluster state %s", cmd->getSystemState().toString().c_str()); - lib::ClusterState oldState = _distributorComponent.getClusterState(); - const lib::ClusterState& state = cmd->getSystemState(); + const lib::ClusterStateBundle oldState = _distributorComponent.getClusterStateBundle(); + const lib::ClusterStateBundle& state = cmd->getClusterStateBundle(); if (state == oldState) { return false; } ensureTransitionTimerStarted(); - removeSuperfluousBuckets(cmd->getSystemState()); + removeSuperfluousBuckets(cmd->getClusterStateBundle()); replyToPreviousPendingClusterStateIfAny(); ClusterInformation::CSP clusterInfo( new SimpleClusterInformation( _distributorComponent.getIndex(), - _distributorComponent.getClusterState(), + _distributorComponent.getClusterStateBundle(), _distributorComponent.getDistributor() .getStorageNodeUpStates())); _pendingClusterState = PendingClusterState::createForClusterStateChange( @@ -423,7 +423,7 @@ BucketDBUpdater::processSingleBucketInfoReply( BucketRequest req = iter->second; _sentMessages.erase(iter); - if (!_distributorComponent.storageNodeIsUp(req.targetNode)) { + if (!_distributorComponent.storageNodeIsUp(req.bucket.getBucketSpace(), req.targetNode)) { // Ignore replies from nodes that are down. return true; } @@ -489,7 +489,7 @@ BucketDBUpdater::processCompletedPendingClusterState() _pendingClusterState->mergeIntoBucketDatabases(); if (_pendingClusterState->getCommand().get()) { - enableCurrentClusterStateInDistributor(); + enableCurrentClusterStateBundleInDistributor(); _distributorComponent.getDistributor().getMessageSender().sendDown( _pendingClusterState->getCommand()); addCurrentStateToClusterStateHistory(); @@ -504,16 +504,16 @@ BucketDBUpdater::processCompletedPendingClusterState() } void -BucketDBUpdater::enableCurrentClusterStateInDistributor() +BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() { - const lib::ClusterState& state( - _pendingClusterState->getCommand()->getSystemState()); + const lib::ClusterStateBundle& state( + _pendingClusterState->getCommand()->getClusterStateBundle()); LOG(debug, "BucketDBUpdater finished processing state %s", - state.toString().c_str()); + state.getBaselineClusterState()->toString().c_str()); - _distributorComponent.getDistributor().enableClusterState(state); + _distributorComponent.getDistributor().enableClusterStateBundle(state); } void @@ -564,7 +564,7 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, using namespace vespalib::xml; xos << XmlTag("bucketdb") << XmlTag("systemstate_active") - << XmlContent(_distributorComponent.getClusterState().toString()) + << XmlContent(_distributorComponent.getClusterStateBundle().getBaselineClusterState()->toString()) << XmlEndTag(); if (_pendingClusterState) { xos << *_pendingClusterState; diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 19e2e259778..a85ee6fe4f7 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -152,11 +152,11 @@ private: void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState); - void removeSuperfluousBuckets(const lib::ClusterState& newState); + void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState); void replyToPreviousPendingClusterStateIfAny(); - void enableCurrentClusterStateInDistributor(); + void enableCurrentClusterStateBundleInDistributor(); void addCurrentStateToClusterStateHistory(); void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&); void sendAllQueuedBucketRechecks(); diff --git a/storage/src/vespa/storage/distributor/clusterinformation.cpp b/storage/src/vespa/storage/distributor/clusterinformation.cpp index cd09e4f46d4..96e94c92819 100644 --- a/storage/src/vespa/storage/distributor/clusterinformation.cpp +++ b/storage/src/vespa/storage/distributor/clusterinformation.cpp @@ -2,6 +2,7 @@ #include "clusterinformation.h" #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vdslib/state/clusterstate.h> namespace storage::distributor { @@ -9,7 +10,7 @@ namespace storage::distributor { uint16_t ClusterInformation::getStorageNodeCount() const { - return getClusterState().getNodeCount(lib::NodeType::STORAGE); + return getClusterStateBundle().getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); } } diff --git a/storage/src/vespa/storage/distributor/clusterinformation.h b/storage/src/vespa/storage/distributor/clusterinformation.h index 25f303d0f52..49abb5e8e75 100644 --- a/storage/src/vespa/storage/distributor/clusterinformation.h +++ b/storage/src/vespa/storage/distributor/clusterinformation.h @@ -10,8 +10,7 @@ namespace storage { namespace lib { -class Distribution; -class ClusterState; +class ClusterStateBundle; } @@ -26,7 +25,7 @@ public: virtual uint16_t getDistributorIndex() const = 0; - virtual const lib::ClusterState& getClusterState() const = 0; + virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0; virtual const char* getStorageUpStates() const = 0; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index fa6d63da32e..86a8ac46cbb 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -63,6 +63,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, : StorageLink("distributor"), DistributorInterface(), framework::StatusReporter("distributor", "Distributor"), + _clusterStateBundle(lib::ClusterState()), _compReg(compReg), _component(compReg, "distributor"), _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()), @@ -332,17 +333,24 @@ Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) return false; } +const lib::ClusterStateBundle& +Distributor::getClusterStateBundle() const +{ + return _clusterStateBundle; +} + void -Distributor::enableClusterState(const lib::ClusterState& state) +Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) { - lib::ClusterState oldState = _clusterState; - _clusterState = state; + lib::ClusterStateBundle oldState = _clusterStateBundle; + _clusterStateBundle = state; propagateClusterStates(); lib::Node myNode(lib::NodeType::DISTRIBUTOR, _component.getIndex()); + const auto &baselineState = *_clusterStateBundle.getBaselineClusterState(); if (!_doneInitializing && - getClusterState().getNodeState(myNode).getState() == lib::State::UP) + baselineState.getNodeState(myNode).getState() == lib::State::UP) { scanAllBuckets(); _doneInitializing = true; @@ -352,8 +360,8 @@ Distributor::enableClusterState(const lib::ClusterState& state) } // Clear all active messages on nodes that are down. - for (uint16_t i = 0; i < state.getNodeCount(lib::NodeType::STORAGE); ++i) { - if (!state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState() + for (uint16_t i = 0; i < baselineState.getNodeCount(lib::NodeType::STORAGE); ++i) { + if (!baselineState.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState() .oneOf(getStorageNodeUpStates())) { std::vector<uint64_t> msgIds( @@ -537,9 +545,8 @@ Distributor::propagateDefaultDistribution( void Distributor::propagateClusterStates() { - auto clusterState = std::make_shared<lib::ClusterState>(_clusterState); for (auto &iter : *_bucketSpaceRepo) { - iter.second->setClusterState(clusterState); + iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first)); } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 9ec21d6ab05..e28c6dd6578 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -78,7 +78,7 @@ public: * Enables a new cluster state. Called after the bucket db updater has * retrieved all bucket info related to the change. */ - void enableClusterState(const lib::ClusterState& clusterState) override; + void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override; /** * Invoked when a pending cluster state for a distribution (config) @@ -114,9 +114,7 @@ public: */ void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t priority) override; - const lib::ClusterState& getClusterState() const override { - return _clusterState; - } + const lib::ClusterStateBundle& getClusterStateBundle() const override; /** * @return Returns the states in which the distributors consider @@ -235,7 +233,7 @@ private: void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); void propagateClusterStates(); - lib::ClusterState _clusterState; + lib::ClusterStateBundle _clusterStateBundle; DistributorComponentRegister& _compReg; storage::DistributorComponent _component; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index 8fa412ea38b..1d2465fb41a 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -3,6 +3,7 @@ #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/messageapi/storagereply.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" @@ -40,10 +41,10 @@ DistributorComponent::sendUp(const api::StorageMessage::SP& msg) _distributor.getMessageSender().sendUp(msg); } -const lib::ClusterState& -DistributorComponent::getClusterState() const +const lib::ClusterStateBundle& +DistributorComponent::getClusterStateBundle() const { - return _distributor.getClusterState(); + return _distributor.getClusterStateBundle(); }; std::vector<uint16_t> @@ -305,9 +306,9 @@ DistributorComponent::getBucketId(const document::DocumentId& docId) const } bool -DistributorComponent::storageNodeIsUp(uint32_t nodeIndex) const +DistributorComponent::storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const { - const lib::NodeState& ns = getClusterState().getNodeState( + const lib::NodeState& ns = getClusterStateBundle().getDerivedClusterState(bucketSpace)->getNodeState( lib::Node(lib::NodeType::STORAGE, nodeIndex)); return ns.getState().oneOf(_distributor.getStorageNodeUpStates()); diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index 33e86d423e7..184ac768afb 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -68,10 +68,10 @@ public: bool ownsBucketInCurrentState(const document::Bucket &bucket) const; /** - * Returns a reference to the current system state. Valid until the next - * time the distributor main thread processes its message queue. + * Returns a reference to the current cluster state bundle. Valid until the + * next time the distributor main thread processes its message queue. */ - const lib::ClusterState& getClusterState() const; + const lib::ClusterStateBundle& getClusterStateBundle() const; /** * Returns the ideal nodes for the given bucket. @@ -86,7 +86,7 @@ public: /** * Returns true if the given storage node is in an "up state". */ - bool storageNodeIsUp(uint32_t nodeIndex) const; + bool storageNodeIsUp(document::BucketSpace bucketSpace, uint32_t nodeIndex) const; /** * Verifies that the given command has been received at the diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h index 3445397c17d..17c300fa0a9 100644 --- a/storage/src/vespa/storage/distributor/distributorinterface.h +++ b/storage/src/vespa/storage/distributor/distributorinterface.h @@ -8,6 +8,7 @@ #include <vespa/document/bucket/bucket.h> namespace storage::api { class MergeBucketReply; } +namespace storage::lib { class ClusterStateBundle; } namespace storage { class DistributorConfiguration; class DistributorMetricSet; @@ -21,7 +22,7 @@ class DistributorInterface : public DistributorMessageSender public: virtual PendingMessageTracker& getPendingMessageTracker() = 0; virtual DistributorMetricSet& getMetrics() = 0; - virtual void enableClusterState(const lib::ClusterState& state) = 0; + virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0; virtual BucketOwnership checkOwnershipInPendingState(const document::Bucket &bucket) const = 0; virtual void notifyDistributionChangeEnabled() = 0; @@ -43,9 +44,9 @@ public: virtual void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t pri) = 0; /** - * @return Returns the current cluster state. + * @return Returns the current cluster state bundle. */ - virtual const lib::ClusterState& getClusterState() const = 0; + virtual const lib::ClusterStateBundle& getClusterStateBundle() const = 0; /** * Returns true if the node is currently initializing. diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index 4018bb88583..773014391fd 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -68,9 +68,10 @@ bool IdealStateManager::iAmUp() const { Node node(NodeType::DISTRIBUTOR, _distributorComponent.getIndex()); - const lib::State &nodeState = _distributorComponent.getClusterState() - .getNodeState(node).getState(); - const lib::State &clusterState = _distributorComponent.getClusterState().getClusterState(); + // Assume that derived cluster states agree on distributor node being up + const auto &state = *_distributorComponent.getClusterStateBundle().getBaselineClusterState(); + const lib::State &nodeState = state.getNodeState(node).getState(); + const lib::State &clusterState = state.getClusterState(); return (nodeState == lib::State::UP && clusterState == lib::State::UP); } @@ -278,7 +279,7 @@ void IdealStateManager::dump_bucket_space_db_status(document::BucketSpace bucket void IdealStateManager::getBucketStatus(std::ostream& out) const { LOG(debug, "Dumping bucket database valid at cluster state version %u", - _distributorComponent.getDistributor().getClusterState().getVersion()); + _distributorComponent.getDistributor().getClusterStateBundle().getVersion()); for (auto& space : _bucketSpaceRepo) { dump_bucket_space_db_status(space.first, out); diff --git a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp index 23e1081f9ae..1ea077fd1c1 100644 --- a/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/multioperationoperation.cpp @@ -119,7 +119,7 @@ MultiOperationOperation::onStart(DistributorMessageSender& sender) // Don't do anything if all nodes are down. bool up = false; for (uint16_t i = 0; i < systemState.getNodeCount(lib::NodeType::STORAGE); i++) { - if (_manager.storageNodeIsUp(i)) { + if (_manager.storageNodeIsUp(_msg->getBucket().getBucketSpace(), i)) { up = true; break; } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 3c96bc55161..3fd8b53f132 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -32,8 +32,8 @@ PendingClusterState::PendingClusterState( api::Timestamp creationTimestamp) : _cmd(newStateCmd), _requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), - _prevClusterState(clusterInfo->getClusterState()), - _newClusterState(newStateCmd->getSystemState()), + _prevClusterStateBundle(clusterInfo->getClusterStateBundle()), + _newClusterStateBundle(newStateCmd->getSystemState()), _clock(clock), _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), @@ -53,8 +53,8 @@ PendingClusterState::PendingClusterState( DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), - _prevClusterState(clusterInfo->getClusterState()), - _newClusterState(clusterInfo->getClusterState()), + _prevClusterStateBundle(clusterInfo->getClusterStateBundle()), + _newClusterStateBundle(clusterInfo->getClusterStateBundle()), _clock(clock), _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), @@ -79,7 +79,7 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, auto pendingTransition = std::make_unique<PendingBucketSpaceDbTransition> (*this, *elem.second, distributionChanged, outdatedNodes, - _clusterInfo, _newClusterState, _creationTimestamp); + _clusterInfo, *_newClusterStateBundle.getDerivedClusterState(elem.first), _creationTimestamp); if (pendingTransition->getBucketOwnershipTransfer()) { _bucketOwnershipTransfer = true; } @@ -99,15 +99,15 @@ PendingClusterState::logConstructionInformation() const "New PendingClusterState constructed with previous cluster " "state '%s', new cluster state '%s', distribution config " "hash: '%s'", - _prevClusterState.toString().c_str(), - _newClusterState.toString().c_str(), + getPrevClusterStateBundleString().c_str(), + getNewClusterStateBundleString().c_str(), distribution.getNodeGraph().getDistributionConfigHash().c_str()); } bool -PendingClusterState::storageNodeUpInNewState(uint16_t node) const +PendingClusterState::storageNodeUpInNewState(document::BucketSpace bucketSpace, uint16_t node) const { - return _newClusterState.getNodeState(Node(NodeType::STORAGE, node)) + return _newClusterStateBundle.getDerivedClusterState(bucketSpace)->getNodeState(Node(NodeType::STORAGE, node)) .getState().oneOf(_clusterInfo->getStorageUpStates()); } @@ -124,7 +124,7 @@ PendingClusterState::getOutdatedNodesMap() const uint16_t PendingClusterState::newStateStorageNodeCount() const { - return _newClusterState.getNodeCount(lib::NodeType::STORAGE); + return _newClusterStateBundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); } bool @@ -144,15 +144,15 @@ PendingClusterState::shouldRequestBucketInfo() const bool PendingClusterState::clusterIsDown() const { - return _newClusterState.getClusterState() == lib::State::DOWN; + return _newClusterStateBundle.getBaselineClusterState()->getClusterState() == lib::State::DOWN; } bool PendingClusterState::iAmDown() const { const lib::NodeState& myState( - _newClusterState.getNodeState(Node(NodeType::DISTRIBUTOR, - _sender.getDistributorIndex()))); + _newClusterStateBundle.getBaselineClusterState()->getNodeState(Node(NodeType::DISTRIBUTOR, + _sender.getDistributorIndex()))); return myState.getState() == lib::State::DOWN; } @@ -161,8 +161,8 @@ PendingClusterState::requestNodes() { LOG(debug, "New system state: Old state was %s, new state is %s", - _prevClusterState.toString().c_str(), - _newClusterState.toString().c_str()); + getPrevClusterStateBundleString().c_str(), + getNewClusterStateBundleString().c_str()); requestBucketInfoFromStorageNodesWithChangedState(); } @@ -173,7 +173,7 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() for (auto &elem : _pendingTransitions) { const OutdatedNodes &outdatedNodes(elem.second->getOutdatedNodes()); for (uint16_t idx : outdatedNodes) { - if (storageNodeUpInNewState(idx)) { + if (storageNodeUpInNewState(elem.first, idx)) { requestNode(BucketSpaceAndNode(elem.first, idx)); } } @@ -191,14 +191,14 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) "and distribution hash '%s'", bucketSpaceAndNode.bucketSpace.getId(), bucketSpaceAndNode.node, - _newClusterState.toString().c_str(), + getNewClusterStateBundleString().c_str(), distributionHash.c_str()); std::shared_ptr<api::RequestBucketInfoCommand> cmd( new api::RequestBucketInfoCommand( bucketSpaceAndNode.bucketSpace, _sender.getDistributorIndex(), - _newClusterState, + *_newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace), distributionHash)); cmd->setPriority(api::StorageMessage::HIGH); @@ -294,7 +294,7 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const { using namespace vespalib::xml; xos << XmlTag("systemstate_pending") - << XmlAttribute("state", _newClusterState); + << XmlAttribute("state", *_newClusterStateBundle.getBaselineClusterState()); for (auto &elem : _sentMessages) { xos << XmlTag("pending") << XmlAttribute("node", elem.second.node) @@ -306,8 +306,8 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const PendingClusterState::Summary PendingClusterState::getSummary() const { - return Summary(_prevClusterState.toString(), - _newClusterState.toString(), + return Summary(getPrevClusterStateBundleString(), + getNewClusterStateBundleString(), (_clock.getTimeInMicros().getTime() - _creationTimestamp)); } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 2d75c795745..b96ba8cbbd7 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -8,6 +8,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/clock.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vespalib/util/xmlserializable.h> #include "outdated_nodes_map.h" #include <unordered_map> @@ -107,11 +108,8 @@ public: return _cmd; } - const lib::ClusterState& getNewClusterState() const { - return _newClusterState; - } - const lib::ClusterState& getPrevClusterState() const { - return _prevClusterState; + const lib::ClusterStateBundle& getNewClusterStateBundle() const { + return _newClusterStateBundle; } /** @@ -184,7 +182,13 @@ private: bool clusterIsDown() const; bool iAmDown() const; - bool storageNodeUpInNewState(uint16_t node) const; + bool storageNodeUpInNewState(document::BucketSpace bucketSpace, uint16_t node) const; + std::string getNewClusterStateBundleString() const { + return _newClusterStateBundle.getBaselineClusterState()->toString(); + } + std::string getPrevClusterStateBundleString() const { + return _prevClusterStateBundle.getBaselineClusterState()->toString(); + } std::shared_ptr<api::SetSystemStateCommand> _cmd; @@ -192,8 +196,8 @@ private: std::vector<bool> _requestedNodes; std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests; - lib::ClusterState _prevClusterState; - lib::ClusterState _newClusterState; + lib::ClusterStateBundle _prevClusterStateBundle; + lib::ClusterStateBundle _newClusterStateBundle; const framework::Clock& _clock; ClusterInformation::CSP _clusterInfo; diff --git a/storage/src/vespa/storage/distributor/simpleclusterinformation.h b/storage/src/vespa/storage/distributor/simpleclusterinformation.h index 2946abf620c..1247d425e50 100644 --- a/storage/src/vespa/storage/distributor/simpleclusterinformation.h +++ b/storage/src/vespa/storage/distributor/simpleclusterinformation.h @@ -11,10 +11,10 @@ class SimpleClusterInformation : public ClusterInformation { public: SimpleClusterInformation(uint16_t myIndex, - const lib::ClusterState& clusterState, + const lib::ClusterStateBundle& clusterStateBundle, const char* storageUpStates) : _myIndex(myIndex), - _clusterState(clusterState), + _clusterStateBundle(clusterStateBundle), _storageUpStates(storageUpStates) {} @@ -22,8 +22,8 @@ public: return _myIndex; } - const lib::ClusterState& getClusterState() const override { - return _clusterState; + const lib::ClusterStateBundle& getClusterStateBundle() const override { + return _clusterStateBundle; } const char* getStorageUpStates() const override { @@ -32,7 +32,7 @@ public: private: uint16_t _myIndex; - lib::ClusterState _clusterState; + lib::ClusterStateBundle _clusterStateBundle; const char* _storageUpStates; }; |