diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-02 16:35:38 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-02 16:35:38 +0000 |
commit | 21786e66e55f6c39dc7c7998b9575e8cbad2e806 (patch) | |
tree | def4941c11a75a56fc3cdfbcdb09e3103d89fa01 /storage | |
parent | c56dc531930ed27eadd347646b2ab3ee18cd8e15 (diff) |
Allow starting deferred tasks concurrently with pending read ops
Since read ops aren't blocked by active bucket locks, a constant
stream of read ops towards a bucket could otherwise starve deferred
tasks from starting.
Diffstat (limited to 'storage')
3 files changed, 77 insertions, 16 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 71b51c9a7b6..72365c61597 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -93,6 +93,16 @@ public: return cmd; } + std::shared_ptr<api::GetCommand> create_get_to_node(uint16_t node) const { + document::BucketId bucket(16, 1234); + auto cmd = std::make_shared<api::GetCommand>( + makeDocumentBucket(bucket), + document::DocumentId("id::testdoctype1:n=1234:foo"), + "[all]"); + cmd->setAddress(makeStorageAddress(node)); + return cmd; + } + PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } @@ -444,7 +454,7 @@ document::BucketId bucket_of(const document::DocumentId& id) { } -TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ops) { +TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_write_ops) { Fixture f; auto cmd = f.createPutToNode(0); auto bucket_id = bucket_of(cmd->getDocumentId()); @@ -455,6 +465,18 @@ TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ EXPECT_EQ(state, TaskRunState::OK); } +TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_only_pending_read_ops) { + Fixture f; + auto cmd = f.create_get_to_node(0); + f.tracker().insert(cmd); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::Aborted; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::OK); +} + TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); @@ -468,6 +490,20 @@ TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_c EXPECT_EQ(state, TaskRunState::OK); } +TEST_F(PendingMessageTrackerTest, deferred_task_can_be_started_with_pending_read_op) { + Fixture f; + auto cmd = f.sendPut(RequestBuilder().toNode(0)); + auto bucket_id = bucket_of(cmd->getDocumentId()); + auto state = TaskRunState::Aborted; + f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ + state = s; + })); + EXPECT_EQ(state, TaskRunState::Aborted); + f.tracker().insert(f.create_get_to_node(0)); // Concurrent Get and Put + f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be allowed to run + EXPECT_EQ(state, TaskRunState::OK); +} + TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 8027b5349f9..44ab91528f2 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -121,7 +121,7 @@ PendingMessageTracker::reply(const api::StorageReply& r) } LOG(debug, "Erased message with id %" PRIu64, msgId); msgs.erase(msgId); - auto deferred_tasks = get_deferred_ops_if_bucket_pending_drained(bucket); + auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket); // Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker. // To avoid deadlocking, we run the tasks outside the lock. // TODO remove locking entirely... Only exists for status pages! @@ -143,23 +143,49 @@ bool is_empty_range(const Range& range) noexcept { return (range.first == range.second); } +template <typename Range> +bool range_is_empty_or_only_has_read_ops(const Range& range) noexcept { + if (is_empty_range(range)) { + return true; + } + // Number of ops to check is expected to be small in the common case + for (auto iter = range.first; iter != range.second; ++iter) { + switch (iter->msgType) { + case api::MessageType::GET_ID: + case api::MessageType::STAT_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::VISITOR_DESTROY_ID: + continue; + default: + return false; + } + } + return true; +} + +} + +bool +PendingMessageTracker::bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept +{ + auto& bucket_idx = boost::multi_index::get<2>(_messages); + auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); + return range_is_empty_or_only_has_read_ops(pending_tasks_for_bucket); } std::vector<std::unique_ptr<DeferredTask>> -PendingMessageTracker::get_deferred_ops_if_bucket_pending_drained(const document::Bucket& bucket) +PendingMessageTracker::get_deferred_ops_if_bucket_writes_drained(const document::Bucket& bucket) { - if (_deferred_bucket_tasks.empty()) { + if (_deferred_read_tasks.empty()) { return {}; } std::vector<std::unique_ptr<DeferredTask>> tasks; - auto& bucket_idx = boost::multi_index::get<2>(_messages); - auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); - if (is_empty_range(pending_tasks_for_bucket)) { - auto waiting_tasks = _deferred_bucket_tasks.equal_range(bucket); + if (bucket_has_no_pending_write_ops(bucket)) { + auto waiting_tasks = _deferred_read_tasks.equal_range(bucket); for (auto task_iter = waiting_tasks.first; task_iter != waiting_tasks.second; ++task_iter) { tasks.emplace_back(std::move(task_iter->second)); } - _deferred_bucket_tasks.erase(waiting_tasks.first, waiting_tasks.second); + _deferred_read_tasks.erase(waiting_tasks.first, waiting_tasks.second); } return tasks; } @@ -168,13 +194,11 @@ void PendingMessageTracker::run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task) { std::unique_lock guard(_lock); - auto& bucket_idx = boost::multi_index::get<2>(_messages); - const auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); - if (is_empty_range(pending_tasks_for_bucket)) { + if (bucket_has_no_pending_write_ops(bucket)) { guard.unlock(); // Must not be held whilst running task, or else recursive sends will deadlock. task->run(TaskRunState::OK); // Nothing pending, run immediately. } else { - _deferred_bucket_tasks.emplace(bucket, std::move(task)); + _deferred_read_tasks.emplace(bucket, std::move(task)); } } @@ -182,7 +206,7 @@ void PendingMessageTracker::abort_deferred_tasks() { std::lock_guard guard(_lock); - for (auto& task : _deferred_bucket_tasks) { + for (auto& task : _deferred_read_tasks) { task.second->run(TaskRunState::Aborted); } } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 39ea5c9c1a6..51c112152b6 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -189,7 +189,7 @@ private: framework::Component _component; NodeInfo _nodeInfo; std::chrono::seconds _nodeBusyDuration; - DeferredBucketTaskMap _deferred_bucket_tasks; + DeferredBucketTaskMap _deferred_read_tasks; // Since distributor is currently single-threaded, this will only // contend when status page is being accessed. It is, however, required @@ -213,7 +213,8 @@ private: void getStatusPerBucket(std::ostream& out) const; TimePoint currentTime() const; - std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_pending_drained(const document::Bucket&); + [[nodiscard]] bool bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept; + std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_writes_drained(const document::Bucket&); }; } |