diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-09-21 17:16:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-21 17:16:39 +0200 |
commit | e9e69c17db2ef07566491633b21658d93aec3c39 (patch) | |
tree | 304729820dc27ac09823a5a0d11a9ad744cd7448 /storage | |
parent | d9db475220d68a54ba2c9f820d3bae78f80abd96 (diff) | |
parent | d040de10d2c4ff2a07582809068871a645a01afb (diff) |
Merge pull request #24158 from vespa-engine/vekterli/avoid-bucket-db-race-during-cluster-state-transition
Avoid bucket DB race during content node cluster state transition [run-systemtest]
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/bucketdb/bucketmanagertest.cpp | 60 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.cpp | 62 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.h | 15 |
3 files changed, 109 insertions, 28 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index 05c5e9e8850..6404dba7935 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -566,7 +566,7 @@ public: // Need a cluster state to work with initially, so that processing // bucket requests can calculate a target distributor. - updater_internal_cluster_state_with_current(); + update_internal_cluster_state_with_current(); } void setUp(const WithBuckets& buckets) { @@ -575,14 +575,24 @@ public: } } - void updater_internal_cluster_state_with_current() { + void update_internal_cluster_state_with_current() { _self._node->setClusterState(*_state); - _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state)); + auto cmd = std::make_shared<api::SetSystemStateCommand>(*_state); + _self._manager->onDown(cmd); + // Also send up reply to release internal state transition barrier. + // We expect there to be no other pending messages at this point. + std::shared_ptr<api::StorageReply> reply(cmd->makeReply()); + auto as_state_reply = std::dynamic_pointer_cast<api::SetSystemStateReply>(reply); + assert(as_state_reply); + assert(_self._top->getNumReplies() == 0); + _self._manager->onUp(as_state_reply); + assert(_self._top->getNumReplies() == 1); + (void)_self._top->getRepliesOnce(); // Clear state reply sent up chain } void update_cluster_state(const lib::ClusterState& state) { _state = std::make_shared<lib::ClusterState>(state); - updater_internal_cluster_state_with_current(); + update_internal_cluster_state_with_current(); } auto acquireBucketLock(const document::BucketId& bucket) { @@ -1227,4 +1237,46 @@ TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_misma EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); } +// This tests a slightly different inconsistency than the above test; the node has +// locally enabled the cluster state (i.e. initially observed version == enabled version), +// but is not yet done processing side effects from doing so. +// See comments in BucketManager::onSetSystemState[Reply]() for rationale +TEST_F(BucketManagerTest, bounce_request_on_state_change_barrier_not_reached) { + ConcurrentOperationFixture f(*this); + + // Make manager-internal and component-internal version state inconsistent + f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1")); + auto new_state = lib::ClusterState("version:3 distributor:1 storage:1"); + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(new_state); + _top->sendDown(state_cmd); + _bottom->waitForMessage(api::MessageType::SETSYSTEMSTATE, MESSAGE_WAIT_TIME); + (void)_bottom->getCommandsOnce(); + _node->setClusterState(new_state); + + // At this point, the node's internal cluster state matches that of the state command + // which was observed on the way down. But there may still be side effects pending from + // enabling the cluster state. So we must still reject requests until we have observed + // the reply for the state command (which must order after any and all side effects). + + _top->sendDown(f.createFullFetchCommand()); + auto replies = f.awaitAndGetReplies(1); + { + auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]); + EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); + } + (void)_top->getRepliesOnce(); + + // Once the cluster state reply has been observed, requests can go through as expected. + _manager->onUp(std::shared_ptr<api::StorageReply>(state_cmd->makeReply())); + _top->waitForMessage(api::MessageType::SETSYSTEMSTATE_REPLY, MESSAGE_WAIT_TIME); + (void)_top->getRepliesOnce(); + + _top->sendDown(f.createFullFetchCommand()); + replies = f.awaitAndGetReplies(1); + { + auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]); + EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); + } +} + } // storage diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 51422de07e6..9e2b8566fdd 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -41,7 +41,8 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, _queueProcessingLock(), _queuedReplies(), _firstEqualClusterStateVersion(0), - _lastClusterStateSeen(0), + _last_cluster_state_version_initiated(0), + _last_cluster_state_version_completed(0), _lastUnifiedClusterState(""), _doneInitialized(false), _requestsCurrentlyProcessing(0), @@ -289,7 +290,7 @@ void BucketManager::run(framework::ThreadHandle& thread) bool didWork = false; BucketInfoRequestMap infoReqs; { - std::lock_guard<std::mutex> guard(_workerLock); + std::lock_guard guard(_workerLock); infoReqs.swap(_bucketInfoRequests); } @@ -298,7 +299,7 @@ void BucketManager::run(framework::ThreadHandle& thread) } { - std::unique_lock<std::mutex> guard(_workerLock); + std::unique_lock guard(_workerLock); for (const auto &req : infoReqs) { assert(req.second.empty()); } @@ -413,7 +414,7 @@ bool BucketManager::onRequestBucketInfo( LOG(debug, "Got request bucket info command %s", cmd->toString().c_str()); if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { - std::lock_guard<std::mutex> guard(_workerLock); + std::lock_guard guard(_workerLock); _bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd); _workerCond.notify_all(); LOG(spam, "Scheduled request bucket info request for retrieval"); @@ -492,7 +493,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard() void BucketManager::enterQueueProtectedSection() { - std::lock_guard<std::mutex> guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); ++_requestsCurrentlyProcessing; } @@ -500,7 +501,7 @@ void BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) { (void) queueGuard; // Only used to enforce guard is held while calling. - std::lock_guard<std::mutex> guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); assert(_requestsCurrentlyProcessing > 0); // Full bucket info fetches may be concurrently interleaved with bucket- // specific fetches outside of the processing thread. We only allow queued @@ -548,25 +549,28 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac clusterState->toString().c_str(), our_hash.c_str()); - std::lock_guard<std::mutex> clusterStateGuard(_clusterStateLock); + std::lock_guard clusterStateGuard(_clusterStateLock); for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) { // Currently small requests should not be forwarded to worker thread assert((*it)->hasSystemState()); const auto their_hash = (*it)->getDistributionHash(); std::ostringstream error; - if (clusterState->getVersion() != _lastClusterStateSeen) { + if ((clusterState->getVersion() != _last_cluster_state_version_initiated) || + (_last_cluster_state_version_initiated != _last_cluster_state_version_completed)) + { // Calling onSetSystemState() on _this_ component and actually switching over // to another cluster state version does not happen atomically. Detect and // gracefully deal with the case where we're not internally in sync. error << "Inconsistent internal cluster state on node during transition; " << "failing request from distributor " << (*it)->getDistributor() << " so it can be retried. Node version is " << clusterState->getVersion() - << ", but last version seen by the bucket manager is " << _lastClusterStateSeen; - } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) { + << ", last version seen by the bucket manager is " << _last_cluster_state_version_initiated + << ", last internally converged version is " << _last_cluster_state_version_completed; + } else if ((*it)->getSystemState().getVersion() > _last_cluster_state_version_initiated) { error << "Ignoring bucket info request for cluster state version " << (*it)->getSystemState().getVersion() << " as newest " - << "version we know of is " << _lastClusterStateSeen; + << "version we know of is " << _last_cluster_state_version_initiated; } else if ((*it)->getSystemState().getVersion() < _firstEqualClusterStateVersion) { @@ -714,20 +718,42 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd, } bool -BucketManager::onSetSystemState( - const std::shared_ptr<api::SetSystemStateCommand>& cmd) +BucketManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) { LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str()); const lib::ClusterState& state(cmd->getSystemState()); std::string unified(unifyState(state)); - std::lock_guard<std::mutex> lock(_clusterStateLock); + std::lock_guard lock(_clusterStateLock); if (unified != _lastUnifiedClusterState - || state.getVersion() != _lastClusterStateSeen + 1) + || state.getVersion() != _last_cluster_state_version_initiated + 1) { _lastUnifiedClusterState = unified; _firstEqualClusterStateVersion = state.getVersion(); } - _lastClusterStateSeen = state.getVersion(); + // At this point, the incoming cluster state version has not yet been enabled on this node; + // it's on its merry way down to the StateManager which handles internal transitions. + // We must take note of the fact that it will _soon_ be enabled, and must avoid processing + // any bucket info requests until any and all side effects caused by this process are + // visible in the bucket DB. Failure to enforce this may cause bucket DB iterations to happen + // concurrently with transition-induced mutations, causing inconsistent results. + // We will not allow processing bucket info fetches until we've observed the _reply_ of the + // state command. Such replies are always ordered _after_ side effects of state enabling have + // become visible in the bucket DB. See onSetSystemStateReply(). + _last_cluster_state_version_initiated = state.getVersion(); + return false; +} + +bool +BucketManager::onSetSystemStateReply(const std::shared_ptr<api::SetSystemStateReply>& reply) +{ + LOG(debug, "onSetSystemStateReply(%s)", reply->toString().c_str()); + std::lock_guard lock(_clusterStateLock); + // This forms part 2 of the cluster state visibility barrier that was initiated when + // we first observed the new cluster state version in onSetSystemState(). After this + // point, bucket info requests may be processed (assuming no new cluster state versions + // have arrived concurrently). In the case of racing cluster states, internal FIFO ordering + // of messages shall ensure we eventually end up with a consistent view of observed versions. + _last_cluster_state_version_completed = reply->getSystemState().getVersion(); return false; } @@ -830,7 +856,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply) { // Should very rarely contend, since persistence replies are all sent up // via a single dispatcher thread. - std::lock_guard<std::mutex> guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); if (_requestsCurrentlyProcessing == 0) { return false; // Nothing to do here; pass through reply. } @@ -867,7 +893,7 @@ bool BucketManager::enqueueAsConflictIfProcessingRequest( const api::StorageReply::SP& reply) { - std::lock_guard<std::mutex> guard(_queueProcessingLock); + std::lock_guard guard(_queueProcessingLock); if (_requestsCurrentlyProcessing != 0) { LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", reply->toString().c_str()); diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index 8bd892e0f09..4a04331e08f 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -62,11 +62,12 @@ private: * after distributor unification is equal to all cluster states seen after. */ uint32_t _firstEqualClusterStateVersion; - /** - * The last cluster state version seen. We must ensure we dont answer to - * cluster states we haven't seen. - */ - uint32_t _lastClusterStateSeen; + // The most current cluster state versions that we've observed on the way _down_ + // through the chain, i.e. prior to being enabled on the node. + uint32_t _last_cluster_state_version_initiated; + // The most current cluster state we've observed on the way _up_ through the + // chain, i.e. after being enabled on the node. + uint32_t _last_cluster_state_version_completed; /** * The unified version of the last cluster state. */ @@ -105,6 +106,8 @@ public: void force_db_sweep_and_metric_update() { updateMetrics(true); } + bool onUp(const std::shared_ptr<api::StorageMessage>&) override; + private: friend struct BucketManagerTest; @@ -198,8 +201,8 @@ private: */ bool replyConflictsWithConcurrentOperation(const api::BucketReply& reply) const; bool enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply); - bool onUp(const std::shared_ptr<api::StorageMessage>&) override; bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; + bool onSetSystemStateReply(const std::shared_ptr<api::SetSystemStateReply>&) override; bool onCreateBucket(const std::shared_ptr<api::CreateBucketCommand>&) override; bool onMergeBucket(const std::shared_ptr<api::MergeBucketCommand>&) override; bool onRemove(const std::shared_ptr<api::RemoveCommand>&) override; |