diff options
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r-- | storage/src/tests/distributor/mergeoperationtest.cpp | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 75faddbe667..90fcf40b3fe 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/distributortestutil.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/vespalib/gtest/gtest.h> @@ -17,11 +18,13 @@ namespace storage::distributor { struct MergeOperationTest : Test, DistributorTestUtil { std::unique_ptr<PendingMessageTracker> _pendingTracker; + OperationSequencer _operation_sequencer; void SetUp() override { createLinks(); _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister()); _sender.setPendingMessageTracker(*_pendingTracker); + _sender.set_operation_sequencer(_operation_sequencer); } void TearDown() override { @@ -250,7 +253,8 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { // at will. auto msg = std::make_shared<api::SetBucketStateCommand>( makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE); - msg->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1)); + vespalib::string storage("storage"); + msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1)); _pendingTracker->insert(msg); sendReply(op); @@ -396,18 +400,31 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) { // Should not block on nodes _not_ included in operation node set _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker)); + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // Node 1 is included in operation node set and should cause a block _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker)); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); getClock().addSecondsToTime(11); - EXPECT_FALSE(op.isBlocked(*_pendingTracker)); // No longer busy + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy // Should block on other operation nodes than the first listed as well _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker)); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); +} + +TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { + getClock().setAbsoluteTimeInSeconds(10); + addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); + enableDistributorClusterState("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo"); + EXPECT_TRUE(token.valid()); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); } TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { |