diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-03-03 15:58:38 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-03-03 15:58:38 +0000 |
commit | 1725ce1df537041cc4894be60b16f247af241c9a (patch) | |
tree | 2c11f313bd2d184201a3adafef864f88c62eb329 | |
parent | 5d72bdae4df1e699202bc2f38212e763018036cd (diff) |
Guard against processing bucket requests with inconsistent internal state version
There's a tiny window of time between when the bucket manager observes a new
state version and when the state version actually is visible in the rest of
the process. We must ensure that we don't end up processing requests when
these two differ, or we might erroneously process requests for version X
using a state only valid for version Y < X.
-rw-r--r-- | storage/src/tests/bucketdb/bucketmanagertest.cpp | 45 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.cpp | 15 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/pendingclusterstate.cpp | 2 |
3 files changed, 50 insertions, 12 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index e14bb8e11d1..b83ccccbf48 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -556,7 +556,7 @@ class ConcurrentOperationFixture { public: explicit ConcurrentOperationFixture(BucketManagerTest& self) : _self(self), - _state("distributor:1 storage:1") + _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1")) { _self.setupTestEnvironment(); _self._top->open(); @@ -566,9 +566,7 @@ public: // Need a cluster state to work with initially, so that processing // bucket requests can calculate a target distributor. - _self._node->setClusterState(_state); - _self._manager->onDown( - std::make_shared<api::SetSystemStateCommand>(_state)); + updater_internal_cluster_state_with_current(); } void setUp(const WithBuckets& buckets) { @@ -577,6 +575,16 @@ public: } } + void updater_internal_cluster_state_with_current() { + _self._node->setClusterState(*_state); + _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state)); + } + + void update_cluster_state(const lib::ClusterState& state) { + _state = std::make_shared<lib::ClusterState>(state); + updater_internal_cluster_state_with_current(); + } + auto acquireBucketLock(const document::BucketId& bucket) { return _self._node->getStorageBucketDatabase().get(bucket, "foo"); } @@ -610,15 +618,15 @@ public: } auto createFullFetchCommand() const { - return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state); + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state); } auto createFullFetchCommandWithHash(vespalib::stringref hash) const { - return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash); + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash); } auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const { - return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash); + return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash); } auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) { @@ -726,7 +734,7 @@ group[2].nodes[2].index 5 private: BucketManagerTest& _self; - lib::ClusterState _state; + std::shared_ptr<lib::ClusterState> _state; }; TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) { @@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED } +// It's possible for the request processing thread and onSetSystemState (which use +// the same mutex) to race with the actual internal component cluster state switch-over. +// Ensure we detect and handle this by bouncing the request back to the distributor. +// It's for all intents and purposes guaranteed that the internal state has converged +// once the distributor has gotten around to retrying the operation. +TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) { + ConcurrentOperationFixture f(*this); + + // Make manager-internal and component-internal version state inconsistent + f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1")); + _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1"))); + + // Info command is sent with state version 2, which mismatches that of internal state 3 + // even though it's the same as the component's current version. + _top->sendDown(f.createFullFetchCommand()); + + auto replies = f.awaitAndGetReplies(1); + auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]); + EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); +} + } // storage diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 2d70ee8d3ba..4680414baa1 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -411,7 +411,7 @@ void BucketManager::startWorkerThread() bool BucketManager::onRequestBucketInfo( const std::shared_ptr<api::RequestBucketInfoCommand>& cmd) { - LOG(debug, "Got request bucket info command"); + LOG(debug, "Got request bucket info command %s", cmd->toString().c_str()); if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { std::lock_guard<std::mutex> guard(_workerLock); @@ -542,9 +542,10 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash(); - LOG(debug, "Processing %zu queued request bucket info commands. " + LOG(debug, "Processing %zu queued request bucket info commands for bucket space %s. " "Using cluster state '%s' and distribution hash '%s'", reqs.size(), + bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str()); @@ -555,7 +556,15 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac const auto their_hash = (*it)->getDistributionHash(); std::ostringstream error; - if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) { + if (clusterState->getVersion() != _lastClusterStateSeen) { + // Calling onSetSystemState() on _this_ component and actually switching over + // to another cluster state version does not happen atomically. Detect and + // gracefully deal with the case where we're not internally in sync. + error << "Inconsistent internal cluster state on node during transition; " + << "failing request from distributor " << (*it)->getDistributor() + << " so it can be retried. Node version is " << clusterState->getVersion() + << ", but last version seen by the bucket manager is " << _lastClusterStateSeen; + } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) { error << "Ignoring bucket info request for cluster state version " << (*it)->getSystemState().getVersion() << " as newest " << "version we know of is " << _lastClusterStateSeen; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index c694fda2c1b..175c7d27033 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -219,7 +219,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) "and distribution hash '%s'", bucketSpaceAndNode.bucketSpace.getId(), bucketSpaceAndNode.node, - getNewClusterStateBundleString().c_str(), + _newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(), distributionHash.c_str()); std::shared_ptr<api::RequestBucketInfoCommand> cmd( |