summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/idealstatemanagertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/idealstatemanagertest.cpp')
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp76
1 files changed, 35 insertions, 41 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 0a36e5cd0e5..e38e4b5b668 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -40,18 +40,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const
{
- return op.checkBlock(bucket, tracker, op_seq);
+ return op.checkBlock(bucket, ctx, op_seq);
}
bool checkBlockForAllNodes(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const
{
- return op.checkBlockForAllNodes(bucket, tracker, op_seq);
+ return op.checkBlockForAllNodes(bucket, ctx, op_seq);
}
std::vector<document::BucketSpace> _bucketSpaces;
@@ -170,92 +170,86 @@ TEST_F(IdealStateManagerTest, recheck_when_active) {
active_ideal_state_operations());
}
-TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) {
+/**
+ * Don't schedule ideal state operations when there's a pending cluster state.
+ * This subsumes the legacy behavior of blocking ideal state ops when there is a
+ * zero-bucket RequestBucketInfoCommand pending towards a node (i.e. full bucket
+ * info fetch).
+ *
+ * This is for two reasons:
+ * - Avoids race conditions where we change the bucket set concurrently with
+ * requesting bucket info.
+ * - Once we get updated bucket info it's likely that the set of ideal state ops
+ * to execute will change anyway, so it makes sense to wait until it's ready.
+ */
+TEST_F(IdealStateManagerTest, block_ideal_state_ops_when_pending_cluster_state_is_present) {
+
+ setupDistributor(2, 10, "version:1 distributor:1 storage:1 .0.s:d");
+
+ // Trigger a pending cluster state with bucket info requests towards 1 node
+ receive_set_system_state_command("version:2 distributor:1 storage:1");
- setupDistributor(2, 10, "distributor:1 storage:2");
-
- framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
-
document::BucketId bid(16, 1234);
- std::vector<document::BucketId> buckets;
-
- // RequestBucketInfoCommand does not have a specific bucketid since it's
- // sent to the entire node. It will then use a null bucketid.
- {
- auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), buckets);
- msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4));
- tracker.insert(msg);
- }
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4)));
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
- }
-
- {
- // Don't trigger on requests to other nodes.
- RemoveBucketOperation op(dummy_cluster_context,
- BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5)));
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
- // Don't block on null-bucket messages that aren't RequestBucketInfo.
- {
- auto msg = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "baz");
- msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 7));
- tracker.insert(msg);
+ // Clear pending by replying with zero buckets for all bucket spaces
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); ++i) {
+ auto& bucket_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.command(i));
+ handle_top_level_message(bucket_req.makeReply());
}
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
}
}
TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) {
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
document::BucketId bid(16, 1234);
{
auto msg = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(bid));
msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4));
- tracker.insert(msg);
+ pending_message_tracker().insert(msg);
}
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq));
+ EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq));
// But blocked for bucket match!
- EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), operation_context(), op_seq));
}
}
TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) {
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
const auto bucket = makeDocumentBucket(document::BucketId(16, 1234));
{
auto msg = std::make_shared<api::JoinBucketsCommand>(bucket);
msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1));
- tracker.insert(msg);
+ pending_message_tracker().insert(msg);
}
auto token = op_seq.try_acquire(bucket, "foo");
EXPECT_TRUE(token.valid());
{
RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(bucket, toVector<uint16_t>(0)));
- EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq));
- EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq));
+ EXPECT_TRUE(checkBlock(op, bucket, operation_context(), op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, bucket, operation_context(), op_seq));
}
}