diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-03-03 15:58:38 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-03-03 15:58:38 +0000 |
commit | 1725ce1df537041cc4894be60b16f247af241c9a (patch) | |
tree | 2c11f313bd2d184201a3adafef864f88c62eb329 /storage/src/tests/bucketdb/bucketmanagertest.cpp | |
parent | 5d72bdae4df1e699202bc2f38212e763018036cd (diff) |
Guard against processing bucket requests with inconsistent internal state version
There's a tiny window of time between when the bucket manager observes a new
state version and when the state version actually is visible in the rest of
the process. We must ensure that we don't end up processing requests when
these two differ, or we might erroneously process requests for version X
using a state only valid for version Y < X.
Diffstat (limited to 'storage/src/tests/bucketdb/bucketmanagertest.cpp')
-rw-r--r-- | storage/src/tests/bucketdb/bucketmanagertest.cpp | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index e14bb8e11d1..b83ccccbf48 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -556,7 +556,7 @@ class ConcurrentOperationFixture { public: explicit ConcurrentOperationFixture(BucketManagerTest& self) : _self(self), - _state("distributor:1 storage:1") + _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1")) { _self.setupTestEnvironment(); _self._top->open(); @@ -566,9 +566,7 @@ public: // Need a cluster state to work with initially, so that processing // bucket requests can calculate a target distributor. - _self._node->setClusterState(_state); - _self._manager->onDown( - std::make_shared<api::SetSystemStateCommand>(_state)); + updater_internal_cluster_state_with_current(); } void setUp(const WithBuckets& buckets) { @@ -577,6 +575,16 @@ public: } } + void updater_internal_cluster_state_with_current() { + _self._node->setClusterState(*_state); + _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state)); + } + + void update_cluster_state(const lib::ClusterState& state) { + _state = std::make_shared<lib::ClusterState>(state); + updater_internal_cluster_state_with_current(); + } + auto acquireBucketLock(const document::BucketId& bucket) { return _self._node->getStorageBucketDatabase().get(bucket, "foo"); } @@ -610,15 +618,15 @@ public: } auto createFullFetchCommand() const { - return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state); + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state); } auto createFullFetchCommandWithHash(vespalib::stringref hash) const { - return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash); + return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash); } auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const { - return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash); + return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash); } auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) { @@ -726,7 +734,7 @@ group[2].nodes[2].index 5 private: BucketManagerTest& _self; - lib::ClusterState _state; + std::shared_ptr<lib::ClusterState> _state; }; TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) { @@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED } +// It's possible for the request processing thread and onSetSystemState (which use +// the same mutex) to race with the actual internal component cluster state switch-over. +// Ensure we detect and handle this by bouncing the request back to the distributor. +// It's for all intents and purposes guaranteed that the internal state has converged +// once the distributor has gotten around to retrying the operation. +TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) { + ConcurrentOperationFixture f(*this); + + // Make manager-internal and component-internal version state inconsistent + f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1")); + _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1"))); + + // Info command is sent with state version 2, which mismatches that of internal state 3 + // even though it's the same as the component's current version. + _top->sendDown(f.createFullFetchCommand()); + + auto replies = f.awaitAndGetReplies(1); + auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]); + EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult()); +} + } // storage |