aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/mergeoperationtest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp27
1 files changed, 22 insertions, 5 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 75faddbe667..90fcf40b3fe 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -17,11 +18,13 @@ namespace storage::distributor {
struct MergeOperationTest : Test, DistributorTestUtil {
std::unique_ptr<PendingMessageTracker> _pendingTracker;
+ OperationSequencer _operation_sequencer;
void SetUp() override {
createLinks();
_pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister());
_sender.setPendingMessageTracker(*_pendingTracker);
+ _sender.set_operation_sequencer(_operation_sequencer);
}
void TearDown() override {
@@ -250,7 +253,8 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
// at will.
auto msg = std::make_shared<api::SetBucketStateCommand>(
makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE);
- msg->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1));
+ vespalib::string storage("storage");
+ msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1));
_pendingTracker->insert(msg);
sendReply(op);
@@ -396,18 +400,31 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) {
// Should not block on nodes _not_ included in operation node set
_pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10));
- EXPECT_FALSE(op.isBlocked(*_pendingTracker));
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
// Node 1 is included in operation node set and should cause a block
_pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker));
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
getClock().addSecondsToTime(11);
- EXPECT_FALSE(op.isBlocked(*_pendingTracker)); // No longer busy
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy
// Should block on other operation nodes than the first listed as well
_pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10));
- EXPECT_TRUE(op.isBlocked(*_pendingTracker));
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+}
+
+TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t");
+ enableDistributorClusterState("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer));
+ auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo");
+ EXPECT_TRUE(token.valid());
+ EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer));
}
TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) {