diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-03 12:18:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-03 12:18:48 +0100 |
commit | 31457e8d51b51c62d29bbb000e6e0f2183c5d911 (patch) | |
tree | ab4e4c140e184bff3d479c89625faf7dc616c0bf /storage | |
parent | a335a2b692ebaf1bb77182aeec45023c6b56d2a7 (diff) | |
parent | 21786e66e55f6c39dc7c7998b9575e8cbad2e806 (diff) |
Merge pull request #15611 from vespa-engine/vekterli/allow-starting-deferred-tasks-concurrently-with-reads
Allow starting deferred tasks concurrently with reads
Diffstat (limited to 'storage')
5 files changed, 153 insertions, 42 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 3dc71bcb433..a6bd9d5d84c 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -175,6 +175,9 @@ public: BucketDatabase::Entry getBucket(const document::BucketId& bId) const; std::vector<document::BucketSpace> getBucketSpaces() const; + + DistributorMessageSenderStub& sender() noexcept { return _sender; } + const DistributorMessageSenderStub& sender() const noexcept { return _sender; } protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 71b51c9a7b6..72365c61597 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -93,6 +93,16 @@ public: return cmd; } + std::shared_ptr<api::GetCommand> create_get_to_node(uint16_t node) const { + document::BucketId bucket(16, 1234); + auto cmd = std::make_shared<api::GetCommand>( + makeDocumentBucket(bucket), + document::DocumentId("id::testdoctype1:n=1234:foo"), + "[all]"); + cmd->setAddress(makeStorageAddress(node)); + return cmd; + } + PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } @@ -444,7 +454,7 @@ document::BucketId bucket_of(const document::DocumentId& id) { } -TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ops) { +TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_write_ops) { Fixture f; auto cmd = f.createPutToNode(0); auto bucket_id = bucket_of(cmd->getDocumentId()); @@ -455,6 +465,18 @@ TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ EXPECT_EQ(state, TaskRunState::OK); } +TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_only_pending_read_ops) { + Fixture f; + auto cmd = f.create_get_to_node(0); + f.tracker().insert(cmd); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::Aborted; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::OK); +} + TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); @@ -468,6 +490,20 @@ TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_c EXPECT_EQ(state, TaskRunState::OK); } +TEST_F(PendingMessageTrackerTest, deferred_task_can_be_started_with_pending_read_op) { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::Aborted; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::Aborted); + f.tracker().insert(f.create_get_to_node(0)); // Concurrent Get and Put + f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be allowed to run + EXPECT_EQ(state, TaskRunState::OK); +} + TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 76112b1c729..daa2ca94bb3 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -19,6 +19,14 @@ using document::BucketId; namespace storage::distributor { +namespace { + +Bucket default_bucket(BucketId id) { + return Bucket(document::FixedBucketSpaces::default_space(), id); +} + +} + struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { document::TestDocMan _test_doc_man; VisitorOperation::Config _default_config; @@ -46,10 +54,6 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { close(); } - static Bucket default_bucket(BucketId id) { - return Bucket(document::FixedBucketSpaces::default_space(), id); - } - std::shared_ptr<VisitorOperation> create_nested_visitor_op(bool valid_command = true) { auto cmd = std::make_shared<api::CreateVisitorCommand>( document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", ""); @@ -88,37 +92,80 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); } +namespace { + +struct ConcurrentMutationFixture { + ReadForWriteVisitorOperationStarterTest& _test; + std::shared_ptr<api::StorageCommand> _mutation; + + explicit ConcurrentMutationFixture(ReadForWriteVisitorOperationStarterTest& test) : _test(test) {} + + void block_bucket_with_mutation() { + // Pending mutating op to same bucket, prevents visitor from starting + auto update = std::make_shared<document::DocumentUpdate>( + _test._test_doc_man.getTypeRepo(), + *_test._test_doc_man.getTypeRepo().getDocumentType("testdoctype1"), + document::DocumentId("id::testdoctype1:n=4:foo")); + auto update_cmd = std::make_shared<api::UpdateCommand>( + default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0)); + + Operation::SP mutating_op; + _test.getExternalOperationHandler().handleMessage(update_cmd, mutating_op); + ASSERT_TRUE(mutating_op); + _test._op_owner->start(mutating_op, OperationStarter::Priority(120)); + ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0", + _test.sender().getCommands(true, true)); + _mutation = _test.sender().command(0); + // Since pending message tracking normally happens in the distributor itself during sendUp, + // we have to emulate this and explicitly insert the sent message into the pending mapping. + _test.getDistributor().getPendingMessageTracker().insert(_mutation); + } + + void unblock_bucket() { + // Pretend update operation completed + auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply()); + _test.getDistributor().getPendingMessageTracker().reply(*update_reply); + _test._op_owner->handleReply(update_reply); + } +}; + +} + TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_start_deferred_if_pending_ops_to_bucket) { + ConcurrentMutationFixture f(*this); auto op = create_rfw_op(create_nested_visitor_op(true)); - // Pending mutating op to same bucket, prevents visitor from starting - auto update = std::make_shared<document::DocumentUpdate>( - _test_doc_man.getTypeRepo(), - *_test_doc_man.getTypeRepo().getDocumentType("testdoctype1"), - document::DocumentId("id::testdoctype1:n=4:foo")); - auto update_cmd = std::make_shared<api::UpdateCommand>( - default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0)); - - Operation::SP mutating_op; - getExternalOperationHandler().handleMessage(update_cmd, mutating_op); - ASSERT_TRUE(mutating_op); - _op_owner->start(mutating_op, OperationStarter::Priority(120)); - ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0", - _sender.getCommands(true, true)); - // Since pending message tracking normally happens in the distributor itself during sendUp, - // we have to emulate this and explicitly insert the sent message into the pending mapping. - getDistributor().getPendingMessageTracker().insert(_sender.command(0)); + ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation()); _op_owner->start(op, OperationStarter::Priority(120)); // Nothing started yet ASSERT_EQ("", _sender.getCommands(true, false, 1)); - - // Pretend update operation completed - auto update_reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply()); - getDistributor().getPendingMessageTracker().reply(*update_reply); - _op_owner->handleReply(update_reply); + ASSERT_NO_FATAL_FAILURE(f.unblock_bucket()); // Visitor should now be started! ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true, false, 1)); } +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_bounced_if_bucket_removed_from_db_before_deferred_start) { + ConcurrentMutationFixture f(*this); + auto op = create_rfw_op(create_nested_visitor_op(true)); + ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation()); + + _op_owner->start(op, OperationStarter::Priority(120)); + // Nothing started yet + ASSERT_EQ("", _sender.getCommands(true, false, 1)); + + // Simulate that ownership of bucket has changed, or replica has gone down. + removeFromBucketDB(_sub_bucket); + ASSERT_NO_FATAL_FAILURE(f.unblock_bucket()); + + // No visitor should be sent to the content node + ASSERT_EQ("", _sender.getCommands(true, false, 1)); + // Instead, we should get a "bucket not found" transient error bounce back to the client. + EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " + "ReturnCode(BUCKET_NOT_FOUND)," + "UpdateReply(id::testdoctype1:n=4:foo, BucketId(0x0000000000000000), " + "timestamp 1, timestamp of updated doc: 0) ReturnCode(NONE)", + _sender.getReplies(false, true)); +} + } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 8027b5349f9..44ab91528f2 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -121,7 +121,7 @@ PendingMessageTracker::reply(const api::StorageReply& r) } LOG(debug, "Erased message with id %" PRIu64, msgId); msgs.erase(msgId); - auto deferred_tasks = get_deferred_ops_if_bucket_pending_drained(bucket); + auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket); // Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker. // To avoid deadlocking, we run the tasks outside the lock. // TODO remove locking entirely... Only exists for status pages! @@ -143,23 +143,49 @@ bool is_empty_range(const Range& range) noexcept { return (range.first == range.second); } +template <typename Range> +bool range_is_empty_or_only_has_read_ops(const Range& range) noexcept { + if (is_empty_range(range)) { + return true; + } + // Number of ops to check is expected to be small in the common case + for (auto iter = range.first; iter != range.second; ++iter) { + switch (iter->msgType) { + case api::MessageType::GET_ID: + case api::MessageType::STAT_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::VISITOR_DESTROY_ID: + continue; + default: + return false; + } + } + return true; +} + +} + +bool +PendingMessageTracker::bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept +{ + auto& bucket_idx = boost::multi_index::get<2>(_messages); + auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); + return range_is_empty_or_only_has_read_ops(pending_tasks_for_bucket); } std::vector<std::unique_ptr<DeferredTask>> -PendingMessageTracker::get_deferred_ops_if_bucket_pending_drained(const document::Bucket& bucket) +PendingMessageTracker::get_deferred_ops_if_bucket_writes_drained(const document::Bucket& bucket) { - if (_deferred_bucket_tasks.empty()) { + if (_deferred_read_tasks.empty()) { return {}; } std::vector<std::unique_ptr<DeferredTask>> tasks; - auto& bucket_idx = boost::multi_index::get<2>(_messages); - auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); - if (is_empty_range(pending_tasks_for_bucket)) { - auto waiting_tasks = _deferred_bucket_tasks.equal_range(bucket); + if (bucket_has_no_pending_write_ops(bucket)) { + auto waiting_tasks = _deferred_read_tasks.equal_range(bucket); for (auto task_iter = waiting_tasks.first; task_iter != waiting_tasks.second; ++task_iter) { tasks.emplace_back(std::move(task_iter->second)); } - _deferred_bucket_tasks.erase(waiting_tasks.first, waiting_tasks.second); + _deferred_read_tasks.erase(waiting_tasks.first, waiting_tasks.second); } return tasks; } @@ -168,13 +194,11 @@ void PendingMessageTracker::run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task) { std::unique_lock guard(_lock); - auto& bucket_idx = boost::multi_index::get<2>(_messages); - const auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); - if (is_empty_range(pending_tasks_for_bucket)) { + if (bucket_has_no_pending_write_ops(bucket)) { guard.unlock(); // Must not be held whilst running task, or else recursive sends will deadlock. task->run(TaskRunState::OK); // Nothing pending, run immediately. } else { - _deferred_bucket_tasks.emplace(bucket, std::move(task)); + _deferred_read_tasks.emplace(bucket, std::move(task)); } } @@ -182,7 +206,7 @@ void PendingMessageTracker::abort_deferred_tasks() { std::lock_guard guard(_lock); - for (auto& task : _deferred_bucket_tasks) { + for (auto& task : _deferred_read_tasks) { task.second->run(TaskRunState::Aborted); } } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 39ea5c9c1a6..51c112152b6 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -189,7 +189,7 @@ private: framework::Component _component; NodeInfo _nodeInfo; std::chrono::seconds _nodeBusyDuration; - DeferredBucketTaskMap _deferred_bucket_tasks; + DeferredBucketTaskMap _deferred_read_tasks; // Since distributor is currently single-threaded, this will only // contend when status page is being accessed. It is, however, required @@ -213,7 +213,8 @@ private: void getStatusPerBucket(std::ostream& out) const; TimePoint currentTime() const; - std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_pending_drained(const document::Bucket&); + [[nodiscard]] bool bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept; + std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_writes_drained(const document::Bucket&); }; } |