summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/bucketdb/bucketmanagertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/bucketdb/bucketmanagertest.cpp')
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp45
1 files changed, 37 insertions, 8 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index e14bb8e11d1..b83ccccbf48 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -556,7 +556,7 @@ class ConcurrentOperationFixture {
public:
explicit ConcurrentOperationFixture(BucketManagerTest& self)
: _self(self),
- _state("distributor:1 storage:1")
+ _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1"))
{
_self.setupTestEnvironment();
_self._top->open();
@@ -566,9 +566,7 @@ public:
// Need a cluster state to work with initially, so that processing
// bucket requests can calculate a target distributor.
- _self._node->setClusterState(_state);
- _self._manager->onDown(
- std::make_shared<api::SetSystemStateCommand>(_state));
+ updater_internal_cluster_state_with_current();
}
void setUp(const WithBuckets& buckets) {
@@ -577,6 +575,16 @@ public:
}
}
+ void updater_internal_cluster_state_with_current() {
+ _self._node->setClusterState(*_state);
+ _self._manager->onDown(std::make_shared<api::SetSystemStateCommand>(*_state));
+ }
+
+ void update_cluster_state(const lib::ClusterState& state) {
+ _state = std::make_shared<lib::ClusterState>(state);
+ updater_internal_cluster_state_with_current();
+ }
+
auto acquireBucketLock(const document::BucketId& bucket) {
return _self._node->getStorageBucketDatabase().get(bucket, "foo");
}
@@ -610,15 +618,15 @@ public:
}
auto createFullFetchCommand() const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state);
}
auto createFullFetchCommandWithHash(vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), 0, *_state, hash);
}
auto createFullFetchCommandWithHash(document::BucketSpace space, vespalib::stringref hash) const {
- return std::make_shared<api::RequestBucketInfoCommand>(space, 0, _state, hash);
+ return std::make_shared<api::RequestBucketInfoCommand>(space, 0, *_state, hash);
}
auto acquireBucketLockAndSendInfoRequest(const document::BucketId& bucket) {
@@ -726,7 +734,7 @@ group[2].nodes[2].index 5
private:
BucketManagerTest& _self;
- lib::ClusterState _state;
+ std::shared_ptr<lib::ClusterState> _state;
};
TEST_F(BucketManagerTest, split_reply_ordered_after_bucket_reply) {
@@ -1214,4 +1222,25 @@ TEST_F(BucketManagerTest, fall_back_to_legacy_global_distribution_hash_on_mismat
EXPECT_EQ(api::ReturnCode::OK, reply.getResult().getResult()); // _not_ REJECTED
}
+// It's possible for the request processing thread and onSetSystemState (which use
+// the same mutex) to race with the actual internal component cluster state switch-over.
+// Ensure we detect and handle this by bouncing the request back to the distributor.
+// It's for all intents and purposes guaranteed that the internal state has converged
+// once the distributor has gotten around to retrying the operation.
+TEST_F(BucketManagerTest, bounce_request_on_internal_cluster_state_version_mismatch) {
+ ConcurrentOperationFixture f(*this);
+
+ // Make manager-internal and component-internal version state inconsistent
+ f.update_cluster_state(lib::ClusterState("version:2 distributor:1 storage:1"));
+ _manager->onDown(std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("version:3 distributor:1 storage:1")));
+
+ // Info command is sent with state version 2, which mismatches that of internal state 3
+ // even though it's the same as the component's current version.
+ _top->sendDown(f.createFullFetchCommand());
+
+ auto replies = f.awaitAndGetReplies(1);
+ auto& reply = dynamic_cast<api::RequestBucketInfoReply&>(*replies[0]);
+ EXPECT_EQ(api::ReturnCode::REJECTED, reply.getResult().getResult());
+}
+
} // storage