summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor')
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp85
-rw-r--r--storage/src/tests/distributor/distributor_message_sender_stub.h5
-rw-r--r--storage/src/tests/distributor/distributortest.cpp9
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp13
-rw-r--r--storage/src/tests/distributor/distributortestutil.h6
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp76
-rw-r--r--storage/src/tests/distributor/maintenancemocks.h2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp28
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp18
9 files changed, 166 insertions, 76 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 5203fec2462..861f8e72832 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storage/distributor/blockingoperationstarter.h>
+#include <vespa/storage/distributor/distributor_stripe_operation_context.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/maintenancemocks.h>
@@ -13,6 +14,86 @@ using namespace ::testing;
namespace storage::distributor {
+struct FakeDistributorStripeOperationContext : public DistributorStripeOperationContext {
+
+ PendingMessageTracker& _message_tracker;
+
+ explicit FakeDistributorStripeOperationContext(PendingMessageTracker& message_tracker)
+ : _message_tracker(message_tracker)
+ {}
+
+ ~FakeDistributorStripeOperationContext() override = default;
+
+ // From DistributorOperationContext:
+ api::Timestamp generate_unique_timestamp() override {
+ abort();
+ }
+ const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
+ abort();
+ }
+ DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
+ abort();
+ }
+ const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
+ abort();
+ }
+ DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
+ abort();
+ }
+ const DistributorConfiguration& distributor_config() const noexcept override {
+ abort();
+ }
+ // From DistributorStripeOperationContext:
+ void update_bucket_database(const document::Bucket&, const BucketCopy&, uint32_t) override {
+ abort();
+ }
+ void update_bucket_database(const document::Bucket&, const std::vector<BucketCopy>&, uint32_t) override {
+ abort();
+ }
+ void remove_node_from_bucket_database(const document::Bucket&, uint16_t) override {
+ abort();
+ }
+ void remove_nodes_from_bucket_database(const document::Bucket&, const std::vector<uint16_t>&) override {
+ abort();
+ }
+ document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId&) const override {
+ abort();
+ }
+ void recheck_bucket_info(uint16_t, const document::Bucket&) override {
+ abort();
+ }
+ document::BucketId get_sibling(const document::BucketId&) const override {
+ abort();
+ }
+ void send_inline_split_if_bucket_too_large(document::BucketSpace, const BucketDatabase::Entry&, uint8_t) override {
+ abort();
+ }
+ OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override {
+ abort();
+ }
+ PendingMessageTracker& pending_message_tracker() noexcept override {
+ return _message_tracker;
+ }
+ const PendingMessageTracker& pending_message_tracker() const noexcept override {
+ return _message_tracker;
+ }
+ bool has_pending_message(uint16_t, const document::Bucket&, uint32_t) const override {
+ abort();
+ }
+ const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace&) const override {
+ abort();
+ }
+ const lib::ClusterStateBundle& cluster_state_bundle() const override {
+ abort();
+ }
+ bool storage_node_is_up(document::BucketSpace, uint32_t) const override {
+ abort();
+ }
+ const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override {
+ abort();
+ }
+};
+
struct BlockingOperationStarterTest : Test {
std::shared_ptr<Operation> createMockOperation() {
return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1)));
@@ -27,6 +108,7 @@ struct BlockingOperationStarterTest : Test {
std::unique_ptr<MockOperationStarter> _starterImpl;
std::unique_ptr<StorageComponentRegisterImpl> _compReg;
std::unique_ptr<PendingMessageTracker> _messageTracker;
+ std::unique_ptr<FakeDistributorStripeOperationContext> _fake_ctx;
std::unique_ptr<OperationSequencer> _operation_sequencer;
std::unique_ptr<BlockingOperationStarter> _operationStarter;
@@ -41,8 +123,9 @@ BlockingOperationStarterTest::SetUp()
_compReg->setClock(_clock);
_clock.setAbsoluteTimeInSeconds(1);
_messageTracker = std::make_unique<PendingMessageTracker>(*_compReg);
+ _fake_ctx = std::make_unique<FakeDistributorStripeOperationContext>(*_messageTracker);
_operation_sequencer = std::make_unique<OperationSequencer>();
- _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl);
+ _operationStarter = std::make_unique<BlockingOperationStarter>(*_fake_ctx, *_operation_sequencer, *_starterImpl);
}
TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) {
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h
index 59a5a82b7df..18662fbce8f 100644
--- a/storage/src/tests/distributor/distributor_message_sender_stub.h
+++ b/storage/src/tests/distributor/distributor_message_sender_stub.h
@@ -87,6 +87,11 @@ public:
return dummy_cluster_context;
}
+ distributor::PendingMessageTracker& getPendingMessageTracker() override {
+ assert(_pending_message_tracker);
+ return *_pending_message_tracker;
+ }
+
const distributor::PendingMessageTracker& getPendingMessageTracker() const override {
assert(_pending_message_tracker);
return *_pending_message_tracker;
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 3d1c6165946..9c3686d3614 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -222,7 +222,6 @@ struct DistributorTest : Test, DistributorTestUtil {
}
void configureMaxClusterClockSkew(int seconds);
- void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
void sendDownDummyRemoveCommand();
void assertSingleBouncedRemoveReplyPresent();
@@ -795,12 +794,6 @@ auto make_dummy_get_command_for_bucket_1() {
}
-void DistributorTest::sendDownClusterStateCommand() {
- lib::ClusterState newState("bits:1 storage:1 distributor:1");
- auto stateCmd = std::make_shared<api::SetSystemStateCommand>(newState);
- _distributor->handleMessage(stateCmd);
-}
-
void DistributorTest::replyToSingleRequestBucketInfoCommandWith1Bucket() {
ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size());
for (uint32_t i = 0; i < _sender.commands().size(); ++i) {
@@ -847,7 +840,7 @@ TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) {
getClock().setAbsoluteTimeInSeconds(1000);
configureMaxClusterClockSkew(10);
- sendDownClusterStateCommand();
+ receive_set_system_state_command("bits:1 storage:1 distributor:1");
ASSERT_NO_FATAL_FAILURE(replyToSingleRequestBucketInfoCommandWith1Bucket());
// SetSystemStateCommand sent down chain at this point.
sendDownDummyRemoveCommand();
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index a2f32d8faa2..3ec1c95b206 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -101,6 +101,19 @@ DistributorTestUtil::triggerDistributionChange(lib::Distribution::SP distr)
}
void
+DistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str)
+{
+ auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ _distributor->handleMessage(state_cmd); // TODO move semantics
+}
+
+void
+DistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _distributor->handleMessage(msg);
+}
+
+void
DistributorTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo)
{
_node->getComponentRegister().setDocumentTypeRepo(repo);
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 6664b8d823d..533fd49811f 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -202,6 +202,12 @@ public:
void setSystemState(const lib::ClusterState& systemState);
+ // Invokes full cluster state transition pipeline rather than directly applying
+ // the state and just pretending everything has been completed.
+ void receive_set_system_state_command(const vespalib::string& state_str);
+
+ void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg);
+
// Must be called prior to createLinks() to have any effect
void set_num_distributor_stripes(uint32_t n_stripes) noexcept {
_num_distributor_stripes = n_stripes;
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index 0a36e5cd0e5..e38e4b5b668 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -40,18 +40,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil {
bool checkBlock(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const
{
- return op.checkBlock(bucket, tracker, op_seq);
+ return op.checkBlock(bucket, ctx, op_seq);
}
bool checkBlockForAllNodes(const IdealStateOperation& op,
const document::Bucket& bucket,
- const PendingMessageTracker& tracker,
+ const DistributorStripeOperationContext& ctx,
const OperationSequencer& op_seq) const
{
- return op.checkBlockForAllNodes(bucket, tracker, op_seq);
+ return op.checkBlockForAllNodes(bucket, ctx, op_seq);
}
std::vector<document::BucketSpace> _bucketSpaces;
@@ -170,92 +170,86 @@ TEST_F(IdealStateManagerTest, recheck_when_active) {
active_ideal_state_operations());
}
-TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) {
+/**
+ * Don't schedule ideal state operations when there's a pending cluster state.
+ * This subsumes the legacy behavior of blocking ideal state ops when there is a
+ * zero-bucket RequestBucketInfoCommand pending towards a node (i.e. full bucket
+ * info fetch).
+ *
+ * This is for two reasons:
+ * - Avoids race conditions where we change the bucket set concurrently with
+ * requesting bucket info.
+ * - Once we get updated bucket info it's likely that the set of ideal state ops
+ * to execute will change anyway, so it makes sense to wait until it's ready.
+ */
+TEST_F(IdealStateManagerTest, block_ideal_state_ops_when_pending_cluster_state_is_present) {
+
+ setupDistributor(2, 10, "version:1 distributor:1 storage:1 .0.s:d");
+
+ // Trigger a pending cluster state with bucket info requests towards 1 node
+ receive_set_system_state_command("version:2 distributor:1 storage:1");
- setupDistributor(2, 10, "distributor:1 storage:2");
-
- framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
-
document::BucketId bid(16, 1234);
- std::vector<document::BucketId> buckets;
-
- // RequestBucketInfoCommand does not have a specific bucketid since it's
- // sent to the entire node. It will then use a null bucketid.
- {
- auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), buckets);
- msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4));
- tracker.insert(msg);
- }
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4)));
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
- }
-
- {
- // Don't trigger on requests to other nodes.
- RemoveBucketOperation op(dummy_cluster_context,
- BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5)));
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
- // Don't block on null-bucket messages that aren't RequestBucketInfo.
- {
- auto msg = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "baz");
- msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 7));
- tracker.insert(msg);
+ // Clear pending by replying with zero buckets for all bucket spaces
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); ++i) {
+ auto& bucket_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.command(i));
+ handle_top_level_message(bucket_req.makeReply());
}
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
}
}
TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) {
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
document::BucketId bid(16, 1234);
{
auto msg = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(bid));
msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4));
- tracker.insert(msg);
+ pending_message_tracker().insert(msg);
}
{
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
// Not blocked for exact node match.
- EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq));
+ EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq));
// But blocked for bucket match!
- EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), operation_context(), op_seq));
}
}
TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) {
setupDistributor(2, 10, "distributor:1 storage:2");
framework::defaultimplementation::FakeClock clock;
- PendingMessageTracker tracker(_node->getComponentRegister());
OperationSequencer op_seq;
const auto bucket = makeDocumentBucket(document::BucketId(16, 1234));
{
auto msg = std::make_shared<api::JoinBucketsCommand>(bucket);
msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1));
- tracker.insert(msg);
+ pending_message_tracker().insert(msg);
}
auto token = op_seq.try_acquire(bucket, "foo");
EXPECT_TRUE(token.valid());
{
RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(bucket, toVector<uint16_t>(0)));
- EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq));
- EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq));
+ EXPECT_TRUE(checkBlock(op, bucket, operation_context(), op_seq));
+ EXPECT_TRUE(checkBlockForAllNodes(op, bucket, operation_context(), op_seq));
}
}
diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h
index fff798d4413..1245c9bb15d 100644
--- a/storage/src/tests/distributor/maintenancemocks.h
+++ b/storage/src/tests/distributor/maintenancemocks.h
@@ -51,7 +51,7 @@ public:
}
void onStart(DistributorStripeMessageSender&) override {}
void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override {}
- bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override {
+ bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override {
return _shouldBlock;
}
void setShouldBlock(bool shouldBlock) {
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 1026ea2855e..52a8bfc41b6 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -18,13 +18,11 @@ using namespace ::testing;
namespace storage::distributor {
struct MergeOperationTest : Test, DistributorTestUtil {
- std::unique_ptr<PendingMessageTracker> _pendingTracker;
OperationSequencer _operation_sequencer;
void SetUp() override {
createLinks();
- _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister());
- _sender.setPendingMessageTracker(*_pendingTracker);
+ _sender.setPendingMessageTracker(pending_message_tracker());
_sender.set_operation_sequencer(_operation_sequencer);
}
@@ -256,7 +254,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE);
vespalib::string storage("storage");
msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1));
- _pendingTracker->insert(msg);
+ pending_message_tracker().insert(msg);
sendReply(op);
// Should not be a remove here!
@@ -400,19 +398,19 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) {
op.setIdealStateManager(&getIdealStateManager());
// Should not block on nodes _not_ included in operation node set
- _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(3, std::chrono::seconds(10));
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
// Node 1 is included in operation node set and should cause a block
- _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10));
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
getClock().addSecondsToTime(11);
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); // No longer busy
// Should block on other operation nodes than the first listed as well
- _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(1, std::chrono::seconds(10));
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
}
@@ -426,8 +424,8 @@ TEST_F(MergeOperationTest, global_bucket_merges_are_not_blocked_by_busy_nodes) {
op.setIdealStateManager(&getIdealStateManager());
// Node 1 is included in operation node set but should not cause a block of global bucket merge
- _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10));
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
}
TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
@@ -437,10 +435,10 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
op.setIdealStateManager(&getIdealStateManager());
- EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo");
EXPECT_TRUE(token.valid());
- EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer));
}
TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index 8c8da1bb197..ec58992ed3e 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -261,7 +261,6 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
framework::defaultimplementation::FakeClock clock;
compReg.setClock(clock);
clock.setAbsoluteTimeInSeconds(1);
- PendingMessageTracker tracker(compReg);
OperationSequencer op_seq;
enableDistributorClusterState("distributor:1 storage:2");
@@ -274,7 +273,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
joinCmd->getSourceBuckets() = joinSources;
joinCmd->setAddress(_Storage0Address);
- tracker.insert(joinCmd);
+ pending_message_tracker().insert(joinCmd);
insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, true);
@@ -284,18 +283,18 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
splitCount,
splitByteSize);
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
// Now, pretend there's a join for another node in the same bucket. This
// will happen when a join is partially completed.
- tracker.clearMessagesForNode(0);
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ pending_message_tracker().clearMessagesForNode(0);
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
joinCmd->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(),
lib::NodeType::STORAGE, 1));
- tracker.insert(joinCmd);
+ pending_message_tracker().insert(joinCmd);
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
@@ -303,7 +302,6 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
framework::defaultimplementation::FakeClock clock;
compReg.setClock(clock);
clock.setAbsoluteTimeInSeconds(1);
- PendingMessageTracker tracker(compReg);
OperationSequencer op_seq;
enableDistributorClusterState("distributor:1 storage:2");
@@ -314,10 +312,10 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)),
maxSplitBits, splitCount, splitByteSize);
- EXPECT_FALSE(op.isBlocked(tracker, op_seq));
+ EXPECT_FALSE(op.isBlocked(operation_context(), op_seq));
auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo");
EXPECT_TRUE(token.valid());
- EXPECT_TRUE(op.isBlocked(tracker, op_seq));
+ EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
} // storage::distributor