aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/bucketdb/bucketmanagertest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-03-03 15:58:38 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-03-03 15:58:38 +0000
commit1725ce1df537041cc4894be60b16f247af241c9a (patch)
tree2c11f313bd2d184201a3adafef864f88c62eb329 /storage/src/tests/bucketdb/bucketmanagertest.cpp
parent5d72bdae4df1e699202bc2f38212e763018036cd (diff)
Guard against processing bucket requests with inconsistent internal state version
There's a tiny window of time between when the bucket manager observes a new state version and when the state version actually is visible in the rest of the process. We must ensure that we don't end up processing requests when these two differ, or we might erroneously process requests for version X using a state only valid for version Y < X.
Diffstat (limited to 'storage/src/tests/bucketdb/bucketmanagertest.cpp')
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp45
1 files changed, 37 insertions, 8 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index e14bb8e11d1..b83ccccbf48 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -556,7 +556,7 @@ class ConcurrentOperationFixture {
public:
explicit ConcurrentOperationFixture(BucketManagerTest& self)
: _self(self),
- _state("distributor:1 storage:1")
+ _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1"))
{
_self.setupTestEnvironment();
_self._top->open();
@@ -566,9 +566,7 @@ public:
// Need a cluster state to work with initially, so that processing
// bucket requests can calculate a target distributor.
- _self._node->setClusterState(_state);
- _self._manager->onDown(
- std::make_shared<api::SetSystemStateCommand>(_state));
+ updater_internal_cluster_state_with_current();
}
void setUp(const WithBuckets& buckets) {
@@ -577,6 +575,16 @@ public:
}
}
+ void updater_internal_cluster_state_with_current() {
+ _self._node->setClusterState(*_state);
+ _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state));
+ }
+
+ void update_cluster_state(const lib::ClusterState& state) {
+ _state = std::make_shared<lib::ClusterState>(state);
+ updater_internal_cluster_state_with_current();
+ }
+
auto acquireBucketLock(const document::BucketId& bucket) {
return _self._node->getStorageBucketDatabase().get(bucket, "foo");
}
@@ -610,15 +618,15 @@ public:
}
auto createFullFetchCommand() const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state);
}
auto createFullFetchCommandWithHash(vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash);
}
auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash);
}
auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) {
@@ -726,7 +734,7 @@ group[2].nodes[2].index 5
private:
BucketManagerTest& _self;
- lib::ClusterState _state;
+ std::shared_ptr<lib::ClusterState> _state;
};
TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) {
@@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED
}
+// It's possible for the request processing thread and onSetSystemState (which use
+// the same mutex) to race with the actual internal component cluster state switch-over.
+// Ensure we detect and handle this by bouncing the request back to the distributor.
+// It's for all intents and purposes guaranteed that the internal state has converged
+// once the distributor has gotten around to retrying the operation.
+TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) {
+ ConcurrentOperationFixture f(*this);
+
+ // Make manager-internal and component-internal version state inconsistent
+ f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1"));
+ _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1")));
+
+ // Info command is sent with state version 2, which mismatches that of internal state 3
+ // even though it's the same as the component's current version.
+ _top->sendDown(f.createFullFetchCommand());
+
+ auto replies = f.awaitAndGetReplies(1);
+ auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]);
+ EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult());
+}
+
} // storage