diff options
-rw-r--r-- | storage/src/tests/bucketdb/bucketmanagertest.cpp | 45 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.cpp | 15 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/pendingclusterstate.cpp | 2 |
3 files changed, 50 insertions, 12 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index e14bb8e11d1..b83ccccbf48 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -556,7 +556,7 @@ class ConcurrentOperationFixture { public: explicit ConcurrentOperationFixture(BucketManagerTest& self) : _self(self), - _state("distributor:1 storage:1") + _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1")) { _self.setupTestEnvironment(); _self._top->open(); @@ -566,9 +566,7 @@ public: // Need a cluster state to work with initially, so that processing // bucket requests can calculate a target distributor. - _self._node->setClusterState(_state); - _self._manager->onDown( - std::make_shared<api::SetSystemStateCommand>(_state)); + updater_internal_cluster_state_with_current(); } void setUp(const WithBuckets& buckets) { @@ -577,6 +575,16 @@ public: } } + void updater_internal_cluster_state_with_current() { + _self._node->setClusterState(*_state); + _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state)); + } + + void update_cluster_state(const lib::ClusterState& state) { + _state = std::make_shared<lib::ClusterState>(state); + updater_internal_cluster_state_with_current(); + } + auto acquireBucketLock(const document::BucketId& bucket) { return _self._node->getStorageBucketDatabase().get(bucket, "foo"); } @@ -610,15 +618,15 @@ public: } auto createFullFetchCommand() const { - return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state); + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state); } auto createFullFetchCommandWithHash(vespalib::stringref hash) const { - return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash); + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash); } auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const { - return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash); + return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash); } auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) { @@ -726,7 +734,7 @@ group[2].nodes[2].index 5 private: BucketManagerTest& _self; - lib::ClusterState _state; + std::shared_ptr<lib::ClusterState> _state; }; TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) { @@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED } +// It's possible for the request processing thread and onSetSystemState (which use +// the same mutex) to race with the actual internal component cluster state switch-over. +// Ensure we detect and handle this by bouncing the request back to the distributor. +// It's for all intents and purposes guaranteed that the internal state has converged +// once the distributor has gotten around to retrying the operation. +TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) { + ConcurrentOperationFixture f(*this); + + // Make manager-internal and component-internal version state inconsistent + f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1")); + _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1"))); + + // Info command is sent with state version 2, which mismatches that of internal state 3 + // even though it's the same as the component's current version. + _top->sendDown(f.createFullFetchCommand()); + + auto replies = f.awaitAndGetReplies(1); + auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]); + EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); +} + } // storage diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 2d70ee8d3ba..4680414baa1 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -411,7 +411,7 @@ void BucketManager::startWorkerThread() bool BucketManager::onRequestBucketInfo( const std::shared_ptr<api::RequestBucketInfoCommand>& cmd) { - LOG(debug, "Got request bucket info command"); + LOG(debug, "Got request bucket info command %s", cmd->toString().c_str()); if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { std::lock_guard<std::mutex> guard(_workerLock); @@ -542,9 +542,10 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash(); - LOG(debug, "Processing %zu queued request bucket info commands. " + LOG(debug, "Processing %zu queued request bucket info commands for bucket space %s. " "Using cluster state '%s' and distribution hash '%s'", reqs.size(), + bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str()); @@ -555,7 +556,15 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac const auto their_hash = (*it)->getDistributionHash(); std::ostringstream error; - if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) { + if (clusterState->getVersion() != _lastClusterStateSeen) { + // Calling onSetSystemState() on _this_ component and actually switching over + // to another cluster state version does not happen atomically. Detect and + // gracefully deal with the case where we're not internally in sync. + error << "Inconsistent internal cluster state on node during transition; " + << "failing request from distributor " << (*it)->getDistributor() + << " so it can be retried. Node version is " << clusterState->getVersion() + << ", but last version seen by the bucket manager is " << _lastClusterStateSeen; + } else if ((*it)->getSystemState().getVersion() > _lastClusterStateSeen) { error << "Ignoring bucket info request for cluster state version " << (*it)->getSystemState().getVersion() << " as newest " << "version we know of is " << _lastClusterStateSeen; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index c694fda2c1b..175c7d27033 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -219,7 +219,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) "and distribution hash '%s'", bucketSpaceAndNode.bucketSpace.getId(), bucketSpaceAndNode.node, - getNewClusterStateBundleString().c_str(), + _newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(), distributionHash.c_str()); std::shared_ptr<api::RequestBucketInfoCommand> cmd( |