diff options
Diffstat (limited to 'storage/src/tests/distributor')
9 files changed, 166 insertions, 76 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index 5203fec2462..861f8e72832 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/distributor/blockingoperationstarter.h> +#include <vespa/storage/distributor/distributor_stripe_operation_context.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/maintenancemocks.h> @@ -13,6 +14,86 @@ using namespace ::testing; namespace storage::distributor { +struct FakeDistributorStripeOperationContext : public DistributorStripeOperationContext { + + PendingMessageTracker& _message_tracker; + + explicit FakeDistributorStripeOperationContext(PendingMessageTracker& message_tracker) + : _message_tracker(message_tracker) + {} + + ~FakeDistributorStripeOperationContext() override = default; + + // From DistributorOperationContext: + api::Timestamp generate_unique_timestamp() override { + abort(); + } + const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override { + abort(); + } + DistributorBucketSpaceRepo& bucket_space_repo() noexcept override { + abort(); + } + const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override { + abort(); + } + DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override { + abort(); + } + const DistributorConfiguration& distributor_config() const noexcept override { + abort(); + } + // From DistributorStripeOperationContext: + void update_bucket_database(const document::Bucket&, const BucketCopy&, uint32_t) override { + abort(); + } + void update_bucket_database(const document::Bucket&, const std::vector<BucketCopy>&, uint32_t) override { + abort(); + } + void remove_node_from_bucket_database(const document::Bucket&, uint16_t) override { + abort(); + } + void remove_nodes_from_bucket_database(const document::Bucket&, const std::vector<uint16_t>&) override { + abort(); + } + document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId&) const override { + abort(); + } + void recheck_bucket_info(uint16_t, const document::Bucket&) override { + abort(); + } + document::BucketId get_sibling(const document::BucketId&) const override { + abort(); + } + void send_inline_split_if_bucket_too_large(document::BucketSpace, const BucketDatabase::Entry&, uint8_t) override { + abort(); + } + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override { + abort(); + } + PendingMessageTracker& pending_message_tracker() noexcept override { + return _message_tracker; + } + const PendingMessageTracker& pending_message_tracker() const noexcept override { + return _message_tracker; + } + bool has_pending_message(uint16_t, const document::Bucket&, uint32_t) const override { + abort(); + } + const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace&) const override { + abort(); + } + const lib::ClusterStateBundle& cluster_state_bundle() const override { + abort(); + } + bool storage_node_is_up(document::BucketSpace, uint32_t) const override { + abort(); + } + const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override { + abort(); + } +}; + struct BlockingOperationStarterTest : Test { std::shared_ptr<Operation> createMockOperation() { return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1))); @@ -27,6 +108,7 @@ struct BlockingOperationStarterTest : Test { std::unique_ptr<MockOperationStarter> _starterImpl; std::unique_ptr<StorageComponentRegisterImpl> _compReg; std::unique_ptr<PendingMessageTracker> _messageTracker; + std::unique_ptr<FakeDistributorStripeOperationContext> _fake_ctx; std::unique_ptr<OperationSequencer> _operation_sequencer; std::unique_ptr<BlockingOperationStarter> _operationStarter; @@ -41,8 +123,9 @@ BlockingOperationStarterTest::SetUp() _compReg->setClock(_clock); _clock.setAbsoluteTimeInSeconds(1); _messageTracker = std::make_unique<PendingMessageTracker>(*_compReg); + _fake_ctx = std::make_unique<FakeDistributorStripeOperationContext>(*_messageTracker); _operation_sequencer = std::make_unique<OperationSequencer>(); - _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl); + _operationStarter = std::make_unique<BlockingOperationStarter>(*_fake_ctx, *_operation_sequencer, *_starterImpl); } TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) { diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h index 59a5a82b7df..18662fbce8f 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.h +++ b/storage/src/tests/distributor/distributor_message_sender_stub.h @@ -87,6 +87,11 @@ public: return dummy_cluster_context; } + distributor::PendingMessageTracker& getPendingMessageTracker() override { + assert(_pending_message_tracker); + return *_pending_message_tracker; + } + const distributor::PendingMessageTracker& getPendingMessageTracker() const override { assert(_pending_message_tracker); return *_pending_message_tracker; diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 3d1c6165946..9c3686d3614 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -222,7 +222,6 @@ struct DistributorTest : Test, DistributorTestUtil { } void configureMaxClusterClockSkew(int seconds); - void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); void sendDownDummyRemoveCommand(); void assertSingleBouncedRemoveReplyPresent(); @@ -795,12 +794,6 @@ auto make_dummy_get_command_for_bucket_1() { } -void DistributorTest::sendDownClusterStateCommand() { - lib::ClusterState newState("bits:1 storage:1 distributor:1"); - auto stateCmd = std::make_shared<api::SetSystemStateCommand>(newState); - _distributor->handleMessage(stateCmd); -} - void DistributorTest::replyToSingleRequestBucketInfoCommandWith1Bucket() { ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); for (uint32_t i = 0; i < _sender.commands().size(); ++i) { @@ -847,7 +840,7 @@ TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) { getClock().setAbsoluteTimeInSeconds(1000); configureMaxClusterClockSkew(10); - sendDownClusterStateCommand(); + receive_set_system_state_command("bits:1 storage:1 distributor:1"); ASSERT_NO_FATAL_FAILURE(replyToSingleRequestBucketInfoCommandWith1Bucket()); // SetSystemStateCommand sent down chain at this point. sendDownDummyRemoveCommand(); diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index a2f32d8faa2..3ec1c95b206 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -101,6 +101,19 @@ DistributorTestUtil::triggerDistributionChange(lib::Distribution::SP distr) } void +DistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str) +{ + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + _distributor->handleMessage(state_cmd); // TODO move semantics +} + +void +DistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + _distributor->handleMessage(msg); +} + +void DistributorTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo) { _node->getComponentRegister().setDocumentTypeRepo(repo); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 6664b8d823d..533fd49811f 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -202,6 +202,12 @@ public: void setSystemState(const lib::ClusterState& systemState); + // Invokes full cluster state transition pipeline rather than directly applying + // the state and just pretending everything has been completed. + void receive_set_system_state_command(const vespalib::string& state_str); + + void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); + // Must be called prior to createLinks() to have any effect void set_num_distributor_stripes(uint32_t n_stripes) noexcept { _num_distributor_stripes = n_stripes; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 0a36e5cd0e5..e38e4b5b668 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -40,18 +40,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return op.checkBlock(bucket, tracker, op_seq); + return op.checkBlock(bucket, ctx, op_seq); } bool checkBlockForAllNodes(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return op.checkBlockForAllNodes(bucket, tracker, op_seq); + return op.checkBlockForAllNodes(bucket, ctx, op_seq); } std::vector<document::BucketSpace> _bucketSpaces; @@ -170,92 +170,86 @@ TEST_F(IdealStateManagerTest, recheck_when_active) { active_ideal_state_operations()); } -TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { +/** + * Don't schedule ideal state operations when there's a pending cluster state. + * This subsumes the legacy behavior of blocking ideal state ops when there is a + * zero-bucket RequestBucketInfoCommand pending towards a node (i.e. full bucket + * info fetch). + * + * This is for two reasons: + * - Avoids race conditions where we change the bucket set concurrently with + * requesting bucket info. + * - Once we get updated bucket info it's likely that the set of ideal state ops + * to execute will change anyway, so it makes sense to wait until it's ready. + */ +TEST_F(IdealStateManagerTest, block_ideal_state_ops_when_pending_cluster_state_is_present) { + + setupDistributor(2, 10, "version:1 distributor:1 storage:1 .0.s:d"); + + // Trigger a pending cluster state with bucket info requests towards 1 node + receive_set_system_state_command("version:2 distributor:1 storage:1"); - setupDistributor(2, 10, "distributor:1 storage:2"); - - framework::defaultimplementation::FakeClock clock; - PendingMessageTracker tracker(_node->getComponentRegister()); OperationSequencer op_seq; - document::BucketId bid(16, 1234); - std::vector<document::BucketId> buckets; - - // RequestBucketInfoCommand does not have a specific bucketid since it's - // sent to the entire node. It will then use a null bucketid. - { - auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), buckets); - msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4)); - tracker.insert(msg); - } { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4))); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); - } - - { - // Don't trigger on requests to other nodes. - RemoveBucketOperation op(dummy_cluster_context, - BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5))); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } - // Don't block on null-bucket messages that aren't RequestBucketInfo. - { - auto msg = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "baz"); - msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 7)); - tracker.insert(msg); + // Clear pending by replying with zero buckets for all bucket spaces + ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); + for (uint32_t i = 0; i < _sender.commands().size(); ++i) { + auto& bucket_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.command(i)); + handle_top_level_message(bucket_req.makeReply()); } { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); } } TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) { setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; - PendingMessageTracker tracker(_node->getComponentRegister()); OperationSequencer op_seq; document::BucketId bid(16, 1234); { auto msg = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(bid)); msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4)); - tracker.insert(msg); + pending_message_tracker().insert(msg); } { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); // Not blocked for exact node match. - EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq)); + EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq)); // But blocked for bucket match! - EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq)); + EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), operation_context(), op_seq)); } } TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) { setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; - PendingMessageTracker tracker(_node->getComponentRegister()); OperationSequencer op_seq; const auto bucket = makeDocumentBucket(document::BucketId(16, 1234)); { auto msg = std::make_shared<api::JoinBucketsCommand>(bucket); msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1)); - tracker.insert(msg); + pending_message_tracker().insert(msg); } auto token = op_seq.try_acquire(bucket, "foo"); EXPECT_TRUE(token.valid()); { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(bucket, toVector<uint16_t>(0))); - EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq)); - EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq)); + EXPECT_TRUE(checkBlock(op, bucket, operation_context(), op_seq)); + EXPECT_TRUE(checkBlockForAllNodes(op, bucket, operation_context(), op_seq)); } } diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index fff798d4413..1245c9bb15d 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -51,7 +51,7 @@ public: } void onStart(DistributorStripeMessageSender&) override {} void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override {} - bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override { + bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override { return _shouldBlock; } void setShouldBlock(bool shouldBlock) { diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 1026ea2855e..52a8bfc41b6 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -18,13 +18,11 @@ using namespace ::testing; namespace storage::distributor { struct MergeOperationTest : Test, DistributorTestUtil { - std::unique_ptr<PendingMessageTracker> _pendingTracker; OperationSequencer _operation_sequencer; void SetUp() override { createLinks(); - _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister()); - _sender.setPendingMessageTracker(*_pendingTracker); + _sender.setPendingMessageTracker(pending_message_tracker()); _sender.set_operation_sequencer(_operation_sequencer); } @@ -256,7 +254,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE); vespalib::string storage("storage"); msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1)); - _pendingTracker->insert(msg); + pending_message_tracker().insert(msg); sendReply(op); // Should not be a remove here! @@ -400,19 +398,19 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) { op.setIdealStateManager(&getIdealStateManager()); // Should not block on nodes _not_ included in operation node set - _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(3, std::chrono::seconds(10)); + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); // Node 1 is included in operation node set and should cause a block - _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10)); + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); getClock().addSecondsToTime(11); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); // No longer busy // Should block on other operation nodes than the first listed as well - _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(1, std::chrono::seconds(10)); + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); } @@ -426,8 +424,8 @@ TEST_F(MergeOperationTest, global_bucket_merges_are_not_blocked_by_busy_nodes) { op.setIdealStateManager(&getIdealStateManager()); // Node 1 is included in operation node set but should not cause a block of global bucket merge - _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10)); + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); } TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { @@ -437,10 +435,10 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo"); EXPECT_TRUE(token.valid()); - EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); } TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index 8c8da1bb197..ec58992ed3e 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -261,7 +261,6 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); - PendingMessageTracker tracker(compReg); OperationSequencer op_seq; enableDistributorClusterState("distributor:1 storage:2"); @@ -274,7 +273,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { joinCmd->getSourceBuckets() = joinSources; joinCmd->setAddress(_Storage0Address); - tracker.insert(joinCmd); + pending_message_tracker().insert(joinCmd); insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, true); @@ -284,18 +283,18 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { splitCount, splitByteSize); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); // Now, pretend there's a join for another node in the same bucket. This // will happen when a join is partially completed. - tracker.clearMessagesForNode(0); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + pending_message_tracker().clearMessagesForNode(0); + EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); joinCmd->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1)); - tracker.insert(joinCmd); + pending_message_tracker().insert(joinCmd); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { @@ -303,7 +302,6 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); - PendingMessageTracker tracker(compReg); OperationSequencer op_seq; enableDistributorClusterState("distributor:1 storage:2"); @@ -314,10 +312,10 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)), maxSplitBits, splitCount, splitByteSize); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo"); EXPECT_TRUE(token.valid()); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } } // storage::distributor |