From 1725ce1df537041cc4894be60b16f247af241c9a Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 3 Mar 2021 15:58:38 +0000 Subject: Guard against processing bucket requests with inconsistent internal state version There's a tiny window of time between when the bucket manager observes a new state version and when the state version actually is visible in the rest of the process. We must ensure that we don't end up processing requests when these two differ, or we might erroneously process requests for version X using a state only valid for version Y < X. --- storage/src/tests/bucketdb/bucketmanagertest.cpp | 45 +++++++++++++++++++----- 1 file changed, 37 insertions(+), 8 deletions(-) (limited to 'storage/src/tests/bucketdb/bucketmanagertest.cpp') diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index e14bb8e11d1..b83ccccbf48 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -556,7 +556,7 @@ class ConcurrentOperationFixture { public: explicit ConcurrentOperationFixture(BucketManagerTest& self) : _self(self), - _state("distributor:1 storage:1") + _state(std::make_shared("distributor:1 storage:1")) { _self.setupTestEnvironment(); _self._top->open(); @@ -566,9 +566,7 @@ public: // Need a cluster state to work with initially, so that processing // bucket requests can calculate a target distributor. - _self._node->setClusterState(_state); - _self._manager->onDown( - std::make_shared(_state)); + updater_internal_cluster_state_with_current(); } void setUp(const WithBuckets& buckets) { @@ -577,6 +575,16 @@ public: } } + void updater_internal_cluster_state_with_current() { + _self._node->setClusterState(*_state); + _self._manager->onDown(std::make_shared(*_state)); + } + + void update_cluster_state(const lib::ClusterState& state) { + _state = std::make_shared(state); + updater_internal_cluster_state_with_current(); + } + auto acquireBucketLock(const document::BucketId& bucket) { return _self._node->getStorageBucketDatabase().get(bucket, "foo"); } @@ -610,15 +618,15 @@ public: } auto createFullFetchCommand() const { - return std::make_shared(makeBucketSpace(), 0, _state); + return std::make_shared(makeBucketSpace(), 0, *_state); } auto createFullFetchCommandWithHash(vespalib::stringref hash) const { - return std::make_shared(makeBucketSpace(), 0, _state, hash); + return std::make_shared(makeBucketSpace(), 0, *_state, hash); } auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const { - return std::make_shared(space, 0, _state, hash); + return std::make_shared(space, 0, *_state, hash); } auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) { @@ -726,7 +734,7 @@ group[2].nodes[2].index 5 private: BucketManagerTest& _self; - lib::ClusterState _state; + std::shared_ptr _state; }; TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) { @@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED } +// It's possible for the request processing thread and onSetSystemState (which use +// the same mutex) to race with the actual internal component cluster state switch-over. +// Ensure we detect and handle this by bouncing the request back to the distributor. +// It's for all intents and purposes guaranteed that the internal state has converged +// once the distributor has gotten around to retrying the operation. +TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) { + ConcurrentOperationFixture f(*this); + + // Make manager-internal and component-internal version state inconsistent + f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1")); + _manager->onDown(std::make_shared(lib::ClusterState("version:3 distributor:1 storage:1"))); + + // Info command is sent with state version 2, which mismatches that of internal state 3 + // even though it's the same as the component's current version. + _top->sendDown(f.createFullFetchCommand()); + + auto replies = f.awaitAndGetReplies(1); + auto& reply = dynamic_cast(*replies[0]); + EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); +} + } // storage -- cgit v1.2.3