summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-02 16:35:38 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-02 16:35:38 +0000
commit21786e66e55f6c39dc7c7998b9575e8cbad2e806 (patch)
treedef4941c11a75a56fc3cdfbcdb09e3103d89fa01 /storage
parentc56dc531930ed27eadd347646b2ab3ee18cd8e15 (diff)
Allow starting deferred tasks concurrently with pending read ops
Since read ops aren't blocked by active bucket locks, a constant stream of read ops towards a bucket could otherwise starve deferred tasks from starting.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp38
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.cpp50
-rw-r--r--storage/src/vespa/storage/distributor/pendingmessagetracker.h5
3 files changed, 77 insertions, 16 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 71b51c9a7b6..72365c61597 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -93,6 +93,16 @@ public:
return cmd;
}
+ std::shared_ptr<api::GetCommand> create_get_to_node(uint16_t node) const {
+ document::BucketId bucket(16, 1234);
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(bucket),
+ document::DocumentId("id::testdoctype1:n=1234:foo"),
+ "[all]");
+ cmd->setAddress(makeStorageAddress(node));
+ return cmd;
+ }
+
PendingMessageTracker& tracker() { return *_tracker; }
auto& clock() { return _clock; }
@@ -444,7 +454,7 @@ document::BucketId bucket_of(const document::DocumentId& id) {
}
-TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_ops) {
+TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_write_ops) {
Fixture f;
auto cmd = f.createPutToNode(0);
auto bucket_id = bucket_of(cmd->getDocumentId());
@@ -455,6 +465,18 @@ TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_
EXPECT_EQ(state, TaskRunState::OK);
}
+TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_only_pending_read_ops) {
+ Fixture f;
+ auto cmd = f.create_get_to_node(0);
+ f.tracker().insert(cmd);
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::Aborted;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::OK);
+}
+
TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
@@ -468,6 +490,20 @@ TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_c
EXPECT_EQ(state, TaskRunState::OK);
}
+TEST_F(PendingMessageTrackerTest, deferred_task_can_be_started_with_pending_read_op) {
+ Fixture f;
+ auto cmd = f.sendPut(RequestBuilder().toNode(0));
+ auto bucket_id = bucket_of(cmd->getDocumentId());
+ auto state = TaskRunState::Aborted;
+ f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){
+ state = s;
+ }));
+ EXPECT_EQ(state, TaskRunState::Aborted);
+ f.tracker().insert(f.create_get_to_node(0)); // Concurrent Get and Put
+ f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be allowed to run
+ EXPECT_EQ(state, TaskRunState::OK);
+}
+
TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) {
Fixture f;
auto cmd = f.sendPut(RequestBuilder().toNode(0));
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
index 8027b5349f9..44ab91528f2 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp
@@ -121,7 +121,7 @@ PendingMessageTracker::reply(const api::StorageReply& r)
}
LOG(debug, "Erased message with id %" PRIu64, msgId);
msgs.erase(msgId);
- auto deferred_tasks = get_deferred_ops_if_bucket_pending_drained(bucket);
+ auto deferred_tasks = get_deferred_ops_if_bucket_writes_drained(bucket);
// Deferred tasks may try to send messages, which in turn will invoke the PendingMessageTracker.
// To avoid deadlocking, we run the tasks outside the lock.
// TODO remove locking entirely... Only exists for status pages!
@@ -143,23 +143,49 @@ bool is_empty_range(const Range& range) noexcept {
return (range.first == range.second);
}
+template <typename Range>
+bool range_is_empty_or_only_has_read_ops(const Range& range) noexcept {
+ if (is_empty_range(range)) {
+ return true;
+ }
+ // Number of ops to check is expected to be small in the common case
+ for (auto iter = range.first; iter != range.second; ++iter) {
+ switch (iter->msgType) {
+ case api::MessageType::GET_ID:
+ case api::MessageType::STAT_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::VISITOR_DESTROY_ID:
+ continue;
+ default:
+ return false;
+ }
+ }
+ return true;
+}
+
+}
+
+bool
+PendingMessageTracker::bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept
+{
+ auto& bucket_idx = boost::multi_index::get<2>(_messages);
+ auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
+ return range_is_empty_or_only_has_read_ops(pending_tasks_for_bucket);
}
std::vector<std::unique_ptr<DeferredTask>>
-PendingMessageTracker::get_deferred_ops_if_bucket_pending_drained(const document::Bucket& bucket)
+PendingMessageTracker::get_deferred_ops_if_bucket_writes_drained(const document::Bucket& bucket)
{
- if (_deferred_bucket_tasks.empty()) {
+ if (_deferred_read_tasks.empty()) {
return {};
}
std::vector<std::unique_ptr<DeferredTask>> tasks;
- auto& bucket_idx = boost::multi_index::get<2>(_messages);
- auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
- if (is_empty_range(pending_tasks_for_bucket)) {
- auto waiting_tasks = _deferred_bucket_tasks.equal_range(bucket);
+ if (bucket_has_no_pending_write_ops(bucket)) {
+ auto waiting_tasks = _deferred_read_tasks.equal_range(bucket);
for (auto task_iter = waiting_tasks.first; task_iter != waiting_tasks.second; ++task_iter) {
tasks.emplace_back(std::move(task_iter->second));
}
- _deferred_bucket_tasks.erase(waiting_tasks.first, waiting_tasks.second);
+ _deferred_read_tasks.erase(waiting_tasks.first, waiting_tasks.second);
}
return tasks;
}
@@ -168,13 +194,11 @@ void
PendingMessageTracker::run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task)
{
std::unique_lock guard(_lock);
- auto& bucket_idx = boost::multi_index::get<2>(_messages);
- const auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket);
- if (is_empty_range(pending_tasks_for_bucket)) {
+ if (bucket_has_no_pending_write_ops(bucket)) {
guard.unlock(); // Must not be held whilst running task, or else recursive sends will deadlock.
task->run(TaskRunState::OK); // Nothing pending, run immediately.
} else {
- _deferred_bucket_tasks.emplace(bucket, std::move(task));
+ _deferred_read_tasks.emplace(bucket, std::move(task));
}
}
@@ -182,7 +206,7 @@ void
PendingMessageTracker::abort_deferred_tasks()
{
std::lock_guard guard(_lock);
- for (auto& task : _deferred_bucket_tasks) {
+ for (auto& task : _deferred_read_tasks) {
task.second->run(TaskRunState::Aborted);
}
}
diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
index 39ea5c9c1a6..51c112152b6 100644
--- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h
+++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h
@@ -189,7 +189,7 @@ private:
framework::Component _component;
NodeInfo _nodeInfo;
std::chrono::seconds _nodeBusyDuration;
- DeferredBucketTaskMap _deferred_bucket_tasks;
+ DeferredBucketTaskMap _deferred_read_tasks;
// Since distributor is currently single-threaded, this will only
// contend when status page is being accessed. It is, however, required
@@ -213,7 +213,8 @@ private:
void getStatusPerBucket(std::ostream& out) const;
TimePoint currentTime() const;
- std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_pending_drained(const document::Bucket&);
+ [[nodiscard]] bool bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept;
+ std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_writes_drained(const document::Bucket&);
};
}