From d040de10d2c4ff2a07582809068871a645a01afb Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 21 Sep 2022 11:56:09 +0000 Subject: Avoid bucket DB race during content node cluster state transition It was possible for a distributor bucket fetch request to be processed _after_ a cluster state was enabled (and internally propagated) on the content node, but _before_ all side effects of this enabling were complete and fully visible. This could cause inconsistent information to be returned to the distributor, causing nodes to get out of sync bucket metadata. This commit handles such transition periods by introducing an implicit barrier between observing the incoming command and outgoing reply for a particular cluster state version. Upon observing the reply for a version, all side effects must already be visible since the reply is only sent once internal state processing is complete (both above and below the SPI). Until initiated and completed versions converge, requests are rejected and will be transparently retried by the distributors. --- storage/src/tests/bucketdb/bucketmanagertest.cpp | 60 +++++++++++++++++++-- .../src/vespa/storage/bucketdb/bucketmanager.cpp | 62 +++++++++++++++------- storage/src/vespa/storage/bucketdb/bucketmanager.h | 15 +++--- 3 files changed, 109 insertions(+), 28 deletions(-) diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index 05c5e9e8850..6404dba7935 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -566,7 +566,7 @@ public: // Need a cluster state to work with initially, so that processing // bucket requests can calculate a target distributor. - updater_internal_cluster_state_with_current(); + update_internal_cluster_state_with_current(); } void setUp(const WithBuckets& buckets) { @@ -575,14 +575,24 @@ public: } } - void updater_internal_cluster_state_with_current() { + void update_internal_cluster_state_with_current() { _self._node->setClusterState(*_state); - _self._manager->onDown(std::make_shared(*_state)); + auto cmd = std::make_shared(*_state); + _self._manager->onDown(cmd); + // Also send up reply to release internal state transition barrier. + // We expect there to be no other pending messages at this point. + std::shared_ptr reply(cmd->makeReply()); + auto as_state_reply = std::dynamic_pointer_cast(reply); + assert(as_state_reply); + assert(_self._top->getNumReplies() == 0); + _self._manager->onUp(as_state_reply); + assert(_self._top->getNumReplies() == 1); + (void)_self._top->getRepliesOnce(); // Clear state reply sent up chain } void update_cluster_state(const lib::ClusterState& state) { _state = std::make_shared(state); - updater_internal_cluster_state_with_current(); + update_internal_cluster_state_with_current(); } auto acquireBucketLock(const document::BucketId& bucket) { @@ -1227,4 +1237,46 @@ TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_misma EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); } +// This tests a slightly different inconsistency than the above test; the node has +// locally enabled the cluster state (i.e. initially observed version == enabled version), +// but is not yet done processing side effects from doing so. +// See comments in BucketManager::onSetSystemState[Reply]() for rationale +TEST_F(BucketManagerTest, bounce_request_on_state_change_barrier_not_reached) { + ConcurrentOperationFixture f(*this); + + // Make manager-internal and component-internal version state inconsistent + f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1")); + auto new_state = lib::ClusterState("version:3 distributor:1 storage:1"); + auto state_cmd = std::make_shared(new_state); + _top->sendDown(state_cmd); + _bottom->waitForMessage(api::MessageType::SETSYSTEMSTATE, MESSAGE_WAIT_TIME); + (void)_bottom->getCommandsOnce(); + _node->setClusterState(new_state); + + // At this point, the node's internal cluster state matches that of the state command + // which was observed on the way down. But there may still be side effects pending from + // enabling the cluster state. So we must still reject requests until we have observed + // the reply for the state command (which must order after any and all side effects). + + _top->sendDown(f.createFullFetchCommand()); + auto replies = f.awaitAndGetReplies(1); + { + auto& reply = dynamic_cast(*replies[0]); + EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); + } + (void)_top->getRepliesOnce(); + + // Once the cluster state reply has been observed, requests can go through as expected. + _manager->onUp(std::shared_ptr(state_cmd->makeReply())); + _top->waitForMessage(api::MessageType::SETSYSTEMSTATE_REPLY, MESSAGE_WAIT_TIME); + (void)_top->getRepliesOnce(); + + _top->sendDown(f.createFullFetchCommand()); + replies = f.awaitAndGetReplies(1); + { + auto& reply = dynamic_cast(*replies[0]); + EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); + } +} + } // storage diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 51422de07e6..9e2b8566fdd 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -41,7 +41,8 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, _queueProcessingLock(), _queuedReplies(), _firstEqualClusterStateVersion(0), - _lastClusterStateSeen(0), + _last_cluster_state_version_initiated(0), + _last_cluster_state_version_completed(0), _lastUnifiedClusterState(""), _doneInitialized(false), _requestsCurrentlyProcessing(0), @@ -289,7 +290,7 @@ void BucketManager::run(framework::ThreadHandle& thread) bool didWork = false; BucketInfoRequestMap infoReqs; { - std::lock_guard guard(_workerLock); + std::lock_guard guard(_workerLock); infoReqs.swap(_bucketInfoRequests); } @@ -298,7 +299,7 @@ void BucketManager::run(framework::ThreadHandle& thread) } { - std::unique_lock guard(_workerLock); + std::unique_lock guard(_workerLock); for (const auto &req : infoReqs) { assert(req.second.empty()); } @@ -413,7 +414,7 @@ bool BucketManager::onRequestBucketInfo( LOG(debug, "Got request bucket info command %s", cmd->toString().c_str()); if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { - std::lock_guard guard(_workerLock); + std::lock_guard guard(_workerLock); _bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd); _workerCond.notify_all(); LOG(spam, "Scheduled request bucket info request for retrieval"); @@ -492,7 +493,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard() void BucketManager::enterQueueProtectedSection() { - std::lock_guard guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); ++_requestsCurrentlyProcessing; } @@ -500,7 +501,7 @@ void BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) { (void) queueGuard; // Only used to enforce guard is held while calling. - std::lock_guard guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); assert(_requestsCurrentlyProcessing > 0); // Full bucket info fetches may be concurrently interleaved with bucket- // specific fetches outside of the processing thread. We only allow queued @@ -548,25 +549,28 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac clusterState->toString().c_str(), our_hash.c_str()); - std::lock_guard clusterStateGuard(_clusterStateLock); + std::lock_guard clusterStateGuard(_clusterStateLock); for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) { // Currently small requests should not be forwarded to worker thread assert((*it)->hasSystemState()); const auto their_hash = (*it)->getDistributionHash(); std::ostringstream error; - if (clusterState->getVersion() != _lastClusterStateSeen) { + if ((clusterState->getVersion() != _last_cluster_state_version_initiated) || + (_last_cluster_state_version_initiated != _last_cluster_state_version_completed)) + { // Calling onSetSystemState() on _this_ component and actually switching over // to another cluster state version does not happen atomically. Detect and // gracefully deal with the case where we're not internally in sync. error << "Inconsistent internal cluster state on node during transition; " << "failing request from distributor " << (*it)->getDistributor() << " so it can be retried. Node version is " << clusterState->getVersion() - << ", but last version seen by the bucket manager is " << _lastClusterStateSeen; - } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) { + << ", last version seen by the bucket manager is " << _last_cluster_state_version_initiated + << ", last internally converged version is " << _last_cluster_state_version_completed; + } else if ((*it)->getSystemState().getVersion() > _last_cluster_state_version_initiated) { error << "Ignoring bucket info request for cluster state version " << (*it)->getSystemState().getVersion() << " as newest " - << "version we know of is " << _lastClusterStateSeen; + << "version we know of is " << _last_cluster_state_version_initiated; } else if ((*it)->getSystemState().getVersion() < _firstEqualClusterStateVersion) { @@ -714,20 +718,42 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd, } bool -BucketManager::onSetSystemState( - const std::shared_ptr& cmd) +BucketManager::onSetSystemState(const std::shared_ptr& cmd) { LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str()); const lib::ClusterState& state(cmd->getSystemState()); std::string unified(unifyState(state)); - std::lock_guard lock(_clusterStateLock); + std::lock_guard lock(_clusterStateLock); if (unified != _lastUnifiedClusterState - || state.getVersion() != _lastClusterStateSeen + 1) + || state.getVersion() != _last_cluster_state_version_initiated + 1) { _lastUnifiedClusterState = unified; _firstEqualClusterStateVersion = state.getVersion(); } - _lastClusterStateSeen = state.getVersion(); + // At this point, the incoming cluster state version has not yet been enabled on this node; + // it's on its merry way down to the StateManager which handles internal transitions. + // We must take note of the fact that it will _soon_ be enabled, and must avoid processing + // any bucket info requests until any and all side effects caused by this process are + // visible in the bucket DB. Failure to enforce this may cause bucket DB iterations to happen + // concurrently with transition-induced mutations, causing inconsistent results. + // We will not allow processing bucket info fetches until we've observed the _reply_ of the + // state command. Such replies are always ordered _after_ side effects of state enabling have + // become visible in the bucket DB. See onSetSystemStateReply(). + _last_cluster_state_version_initiated = state.getVersion(); + return false; +} + +bool +BucketManager::onSetSystemStateReply(const std::shared_ptr& reply) +{ + LOG(debug, "onSetSystemStateReply(%s)", reply->toString().c_str()); + std::lock_guard lock(_clusterStateLock); + // This forms part 2 of the cluster state visibility barrier that was initiated when + // we first observed the new cluster state version in onSetSystemState(). After this + // point, bucket info requests may be processed (assuming no new cluster state versions + // have arrived concurrently). In the case of racing cluster states, internal FIFO ordering + // of messages shall ensure we eventually end up with a consistent view of observed versions. + _last_cluster_state_version_completed = reply->getSystemState().getVersion(); return false; } @@ -830,7 +856,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply) { // Should very rarely contend, since persistence replies are all sent up // via a single dispatcher thread. - std::lock_guard guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); if (_requestsCurrentlyProcessing == 0) { return false; // Nothing to do here; pass through reply. } @@ -867,7 +893,7 @@ bool BucketManager::enqueueAsConflictIfProcessingRequest( const api::StorageReply::SP& reply) { - std::lock_guard guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); if (_requestsCurrentlyProcessing != 0) { LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", reply->toString().c_str()); diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index 8bd892e0f09..4a04331e08f 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -62,11 +62,12 @@ private: * after distributor unification is equal to all cluster states seen after. */ uint32_t _firstEqualClusterStateVersion; - /** - * The last cluster state version seen. We must ensure we dont answer to - * cluster states we haven't seen. - */ - uint32_t _lastClusterStateSeen; + // The most current cluster state versions that we've observed on the way _down_ + // through the chain, i.e. prior to being enabled on the node. + uint32_t _last_cluster_state_version_initiated; + // The most current cluster state we've observed on the way _up_ through the + // chain, i.e. after being enabled on the node. + uint32_t _last_cluster_state_version_completed; /** * The unified version of the last cluster state. */ @@ -105,6 +106,8 @@ public: void force_db_sweep_and_metric_update() { updateMetrics(true); } + bool onUp(const std::shared_ptr&) override; + private: friend struct BucketManagerTest; @@ -198,8 +201,8 @@ private: */ bool replyConflictsWithConcurrentOperation(const api::BucketReply& reply) const; bool enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply); - bool onUp(const std::shared_ptr&) override; bool onSetSystemState(const std::shared_ptr&) override; + bool onSetSystemStateReply(const std::shared_ptr&) override; bool onCreateBucket(const std::shared_ptr&) override; bool onMergeBucket(const std::shared_ptr&) override; bool onRemove(const std::shared_ptr&) override; -- cgit v1.2.3