From 33b3bdca77a0141e4f20185024f8e772209ffbef Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 14 Mar 2019 14:40:52 +0000 Subject: WIP on BucketDBUpdater explicit activation support --- .../src/tests/distributor/bucketdbupdatertest.cpp | 2 + .../vespa/storage/distributor/bucketdbupdater.cpp | 77 +++++++++++++++++++++- .../vespa/storage/distributor/bucketdbupdater.h | 6 ++ .../storage/distributor/pendingclusterstate.cpp | 4 ++ .../storage/distributor/pendingclusterstate.h | 28 +++++++- .../vespa/storage/storageserver/statemanager.cpp | 5 +- 6 files changed, 115 insertions(+), 7 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 6c70988cbc6..fd9512b2bd6 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -233,6 +233,8 @@ public: void setUp() override { createLinks(); _bucketSpaces = getBucketSpaces(); + // Disable deferred activation by default (at least for now) to avoid breaking the entire world. + getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); }; void tearDown() override { diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 9c6ec4a7f0e..20ec67fdb14 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -53,6 +53,13 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden out << "BucketDBUpdater"; } +bool +BucketDBUpdater::shouldDeferStateEnabling() const noexcept +{ + return _distributorComponent.getDistributor().getConfig() + .allowStaleReadsDuringClusterStateTransitions(); +} + bool BucketDBUpdater::hasPendingClusterState() const { @@ -195,6 +202,16 @@ BucketDBUpdater::replyToPreviousPendingClusterStateIfAny() } } +void +BucketDBUpdater::replyToActivationWithActualVersion( + const api::ActivateClusterStateVersionCommand& cmd, + uint32_t actualVersion) +{ + auto reply = std::make_shared(cmd); + reply->setActualVersion(actualVersion); + _distributorComponent.sendUp(reply); // TODO let API accept rvalues +} + bool BucketDBUpdater::onSetSystemState( const std::shared_ptr& cmd) @@ -237,6 +254,37 @@ BucketDBUpdater::onSetSystemState( return true; } +bool +BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr& cmd) +{ + // TODO test edges! + if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { + const auto pending_version = _pendingClusterState->clusterStateVersion(); + if (pending_version == cmd->version()) { + if (isPendingClusterStateCompleted()) { + assert(_pendingClusterState->isDeferred()); + activatePendingClusterState(); + } else { + LOG(error, "Received cluster state activation for pending version %u " + "without pending state being complete yet. This is not expected, " + "as no activation should be sent before all distributors have " + "reported that state processing is complete.", pending_version); + replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed). + return true; + } + } else { + replyToActivationWithActualVersion(*cmd, pending_version); + return true; + } + } else { + // Likely just a resend, but log warn for now to get a feel of how common it is. + LOG(warning, "Received cluster state activation command for version %u, which " + "has no corresponding pending state. Resent operation?", cmd->version()); + } + // Fall through to next link in call chain that cares about this message. + return false; +} + BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() { if (_reply) { @@ -496,15 +544,38 @@ BucketDBUpdater::isPendingClusterStateCompleted() const void BucketDBUpdater::processCompletedPendingClusterState() +{ + if (_pendingClusterState->isDeferred()) { + assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands. + // Sending down SetSystemState command will reach the state manager and a reply + // will be auto-sent back to the cluster controller in charge. Once this happens, + // it will send an explicit activation command once all distributors have reported + // that their pending cluster states have completed. + _distributorComponent.getDistributor().getMessageSender().sendDown( + _pendingClusterState->getCommand()); + _pendingClusterState->clearCommand(); + return; + } + // Distribution config change or non-deferred cluster state. Immediately activate + // the pending state without being told to do so explicitly. + activatePendingClusterState(); +} + +void +BucketDBUpdater::activatePendingClusterState() { _pendingClusterState->mergeIntoBucketDatabases(); - if (_pendingClusterState->getCommand().get()) { + if (_pendingClusterState->isVersionedTransition()) { enableCurrentClusterStateBundleInDistributor(); - _distributorComponent.getDistributor().getMessageSender().sendDown( - _pendingClusterState->getCommand()); + if (_pendingClusterState->hasCommand()) { + _distributorComponent.getDistributor().getMessageSender().sendDown( + _pendingClusterState->getCommand()); + } addCurrentStateToClusterStateHistory(); } else { + // TODO distribution changes cannot currently be deferred as they are not + // initiated by the cluster controller! _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 18893edaeda..973495d2007 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -44,6 +44,7 @@ public: void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); bool onSetSystemState(const std::shared_ptr& cmd) override; + bool onActivateClusterStateVersion(const std::shared_ptr& cmd) override; bool onRequestBucketInfoReply(const std::shared_ptr & repl) override; bool onMergeBucketReply(const std::shared_ptr& reply) override; bool onNotifyBucketChange(const std::shared_ptr&) override; @@ -125,6 +126,7 @@ private: } }; + bool shouldDeferStateEnabling() const noexcept; bool hasPendingClusterState() const; bool pendingClusterStateAccepted(const std::shared_ptr& repl); bool processSingleBucketInfoReply(const std::shared_ptr& repl); @@ -132,6 +134,7 @@ private: const BucketRequest& req); bool isPendingClusterStateCompleted() const; void processCompletedPendingClusterState(); + void activatePendingClusterState(); void mergeBucketInfoWithDatabase(const std::shared_ptr& repl, const BucketRequest& req); void convertBucketInfoToBucketList(const std::shared_ptr& repl, @@ -162,6 +165,9 @@ private: void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState); void replyToPreviousPendingClusterStateIfAny(); + void replyToActivationWithActualVersion( + const api::ActivateClusterStateVersionCommand& cmd, + uint32_t actualVersion); void enableCurrentClusterStateBundleInDistributor(); void addCurrentStateToClusterStateHistory(); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index ddbf87d4f24..6cba7084037 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -42,6 +42,8 @@ PendingClusterState::PendingClusterState( _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo), + _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()), + _isVersionedTransition(true), _bucketOwnershipTransfer(false), _pendingTransitions() { @@ -65,6 +67,8 @@ PendingClusterState::PendingClusterState( _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo), + _clusterStateVersion(0), + _isVersionedTransition(false), _bucketOwnershipTransfer(true), _pendingTransitions() { diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index cfccf36a61e..cedc0573381 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -108,10 +108,31 @@ public: return _bucketOwnershipTransfer; } + bool hasCommand() const noexcept { + return (_cmd.get() != nullptr); + } + std::shared_ptr getCommand() { return _cmd; } + bool isVersionedTransition() const noexcept { + return _isVersionedTransition; + } + + uint32_t clusterStateVersion() const noexcept { + return _clusterStateVersion; + } + + bool isDeferred() const noexcept { + return (isVersionedTransition() + && _newClusterStateBundle.deferredActivation()); + } + + void clearCommand() { + _cmd.reset(); + } + const lib::ClusterStateBundle& getNewClusterStateBundle() const { return _newClusterStateBundle; } @@ -210,9 +231,10 @@ private: api::Timestamp _creationTimestamp; DistributorMessageSender& _sender; - DistributorBucketSpaceRepo &_bucketSpaceRepo; - DistributorBucketSpaceRepo &_readOnlyBucketSpaceRepo; - + DistributorBucketSpaceRepo& _bucketSpaceRepo; + DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo; + uint32_t _clusterStateVersion; + bool _isVersionedTransition; bool _bucketOwnershipTransfer; std::unordered_map, document::BucketSpace::hash> _pendingTransitions; }; diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 9532f99dd2a..02b9a765dd8 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -518,7 +518,10 @@ bool StateManager::onActivateClusterStateVersion( const std::shared_ptr& cmd) { - // TODO invoke listeners and set actual version + // TODO we probably don't want to invoke listeners here? but just bounce with + // currently activated bundle version? + // Must ensure that layer above (i.e. distributor) maintains strict operation + // ordering. sendUp(std::make_shared(*cmd)); return true; } -- cgit v1.2.3