diff options
5 files changed, 62 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index e8401914abf..320c55f9998 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -8,6 +8,8 @@ #include <vespa/storage/distributor/operations/external/visitoroperation.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/visitor.h> #include <tests/distributor/distributortestutil.h> @@ -25,6 +27,11 @@ Bucket default_bucket(BucketId id) { return Bucket(document::FixedBucketSpaces::default_space(), id); } +api::StorageMessageAddress make_storage_address(uint16_t node) { + static vespalib::string _storage("storage"); + return {&_storage, lib::NodeType::STORAGE, node}; +} + } struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { @@ -46,6 +53,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { createLinks(); setupDistributor(1, 1, "version:1 distributor:1 storage:1"); _op_owner = std::make_unique<OperationOwner>(_sender, getClock()); + _sender.setPendingMessageTracker(getDistributor().getPendingMessageTracker()); addNodesToBucketDB(_sub_bucket, "0=1/2/3/t"); } @@ -81,6 +89,7 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_that_fails_precondition_checks_is_immediately_failed) { auto op = create_rfw_op(create_nested_visitor_op(false)); _op_owner->start(op, OperationStarter::Priority(120)); + ASSERT_EQ("", _sender.getCommands(true)); EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " "ReturnCode(ILLEGAL_PARAMETERS, No buckets in CreateVisitorCommand for visitor 'foo')", _sender.getLastReply()); @@ -92,6 +101,21 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); } +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_is_bounced_if_merge_pending_for_bucket) { + auto op = create_rfw_op(create_nested_visitor_op(true)); + std::vector<api::MergeBucketCommand::Node> nodes({{0, false}, {1, false}}); + auto merge = std::make_shared<api::MergeBucketCommand>(default_bucket(_sub_bucket), + std::move(nodes), + api::Timestamp(123456)); + merge->setAddress(make_storage_address(0)); + getDistributor().getPendingMessageTracker().insert(merge); + _op_owner->start(op, OperationStarter::Priority(120)); + ASSERT_EQ("", _sender.getCommands(true)); + EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " + "ReturnCode(BUSY, A merge operation is pending for this bucket)", + _sender.getLastReply()); +} + namespace { struct ConcurrentMutationFixture { diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp index 1e87172e870..5ebf20138a4 100644 --- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.cpp @@ -2,6 +2,7 @@ #include "read_for_write_visitor_operation.h" #include "visitoroperation.h" +#include <vespa/storage/distributor/distributormessagesender.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/operationowner.h> #include <cassert> @@ -40,6 +41,11 @@ void ReadForWriteVisitorOperationStarter::onStart(DistributorMessageSender& send assert(_visitor_op->has_sent_reply()); return; } + if (bucket_has_pending_merge(*maybe_bucket, sender.getPendingMessageTracker())) { + LOG(debug, "A merge is pending for bucket %s, failing visitor", maybe_bucket->toString().c_str()); + _visitor_op->fail_with_merge_pending(sender); + return; + } auto bucket_handle = _operation_sequencer.try_acquire(*maybe_bucket); if (!bucket_handle.valid()) { LOG(debug, "An operation is already pending for bucket %s, failing visitor", @@ -71,4 +77,26 @@ void ReadForWriteVisitorOperationStarter::onReceive(DistributorMessageSender& se _visitor_op->onReceive(sender, msg); } +namespace { + +struct MergePendingChecker : PendingMessageTracker::Checker { + bool has_pending_merge = false; + bool check(uint32_t message_type, [[maybe_unused]] uint16_t node, [[maybe_unused]] uint8_t priority) override { + if (message_type == api::MessageType::MERGEBUCKET_ID) { + has_pending_merge = true; + } + return true; + } +}; + +} + +bool ReadForWriteVisitorOperationStarter::bucket_has_pending_merge(const document::Bucket& bucket, + const PendingMessageTracker& tracker) const { + MergePendingChecker merge_checker; + tracker.checkPendingMessages(bucket, merge_checker); + return merge_checker.has_pending_merge; +} + + } diff --git a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h index 06b4f60307e..a6b414e6fb5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h +++ b/storage/src/vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h @@ -43,6 +43,8 @@ public: void onStart(DistributorMessageSender& sender) override; void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; +private: + bool bucket_has_pending_merge(const document::Bucket&, const PendingMessageTracker& tracker) const; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index 3d79aa176d7..5a8adb26cd8 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -887,6 +887,13 @@ VisitorOperation::fail_with_bucket_already_locked(DistributorMessageSender& send sendReply(api::ReturnCode(api::ReturnCode::BUSY, "This bucket is already locked by another operation"), sender); } +void +VisitorOperation::fail_with_merge_pending(DistributorMessageSender& sender) +{ + assert(is_read_for_write()); + sendReply(api::ReturnCode(api::ReturnCode::BUSY, "A merge operation is pending for this bucket"), sender); +} + std::optional<document::Bucket> VisitorOperation::first_bucket_to_visit() const { diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index 4043b0b2c50..e6ad7a042dd 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -51,6 +51,7 @@ public: // Only valid to call if is_read_for_write() == true void fail_with_bucket_already_locked(DistributorMessageSender& sender); + void fail_with_merge_pending(DistributorMessageSender& sender); [[nodiscard]] bool verify_command_and_expand_buckets(DistributorMessageSender& sender); |