summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2020-07-02 11:04:13 +0000
committerHåvard Pettersen <havardpe@oath.com>2020-07-02 11:04:13 +0000
commit66b9ce0c2189729a310cb44f2b425484d149463f (patch)
tree5380dc276c5b0db2b1f114b496b40b378008f6f6 /staging_vespalib
parent87372a16c7ec43a5babcf44bb5ff109ac3369b2e (diff)
improve sync
- lock once to post all sync tasks - only post to non-idle strands - let sync tasks bypass blocking based on task limit
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp67
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h1
2 files changed, 42 insertions, 26 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 3e87749c794..765d7b6c9f5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -1,6 +1,7 @@
// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "adaptive_sequenced_executor.h"
+#include <vespa/vespalib/util/lambdatask.h>
namespace vespalib {
@@ -218,6 +219,35 @@ AdaptiveSequencedExecutor::worker_main()
_thread_tools->allow_worker_exit.await();
}
+void
+AdaptiveSequencedExecutor::run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock)
+{
+ assert(_self.state != Self::State::CLOSED);
+ strand.queue.push(std::move(task));
+ _stats.queueSize.add(++_self.pending_tasks);
+ ++_stats.acceptedTasks;
+ if (strand.state == Strand::State::WAITING) {
+ ++_self.waiting_tasks;
+ } else if (strand.state == Strand::State::IDLE) {
+ if (_worker_stack.size() < _cfg.num_threads) {
+ strand.state = Strand::State::WAITING;
+ _wait_queue.push(&strand);
+ _self.waiting_tasks += strand.queue.size();
+ } else {
+ strand.state = Strand::State::ACTIVE;
+ assert(_wait_queue.empty());
+ Worker *worker = _worker_stack.back();
+ _worker_stack.popBack();
+ assert(worker->state == Worker::State::BLOCKED);
+ assert(worker->strand == nullptr);
+ worker->state = Worker::State::RUNNING;
+ worker->strand = &strand;
+ lock.unlock(); // UNLOCK
+ worker->cond.notify_one();
+ }
+ }
+}
+
AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
size_t max_waiting, size_t max_pending)
: ISequencedTaskExecutor(num_strands),
@@ -268,38 +298,23 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
Strand &strand = _strands[id.getId()];
auto guard = std::unique_lock(_mutex);
maybe_block_self(guard);
- assert(_self.state != Self::State::CLOSED);
- strand.queue.push(std::move(task));
- _stats.queueSize.add(++_self.pending_tasks);
- ++_stats.acceptedTasks;
- if (strand.state == Strand::State::WAITING) {
- ++_self.waiting_tasks;
- } else if (strand.state == Strand::State::IDLE) {
- if (_worker_stack.size() < _cfg.num_threads) {
- strand.state = Strand::State::WAITING;
- _wait_queue.push(&strand);
- _self.waiting_tasks += strand.queue.size();
- } else {
- strand.state = Strand::State::ACTIVE;
- assert(_wait_queue.empty());
- Worker *worker = _worker_stack.back();
- _worker_stack.popBack();
- assert(worker->state == Worker::State::BLOCKED);
- assert(worker->strand == nullptr);
- worker->state = Worker::State::RUNNING;
- worker->strand = &strand;
- guard.unlock(); // UNLOCK
- worker->cond.notify_one();
- }
- }
+ run_task_in_strand(std::move(task), strand, guard);
}
void
AdaptiveSequencedExecutor::sync()
{
vespalib::CountDownLatch latch(_strands.size());
- for (size_t i = 0; i < _strands.size(); ++i) {
- execute(ExecutorId(i), [&](){ latch.countDown(); });
+ {
+ auto guard = std::unique_lock(_mutex);
+ for (Strand &strand: _strands) {
+ if (strand.state == Strand::State::IDLE) {
+ latch.countDown();
+ } else {
+ Task::UP task = vespalib::makeLambdaTask([&](){ latch.countDown(); });
+ run_task_in_strand(std::move(task), strand, guard);
+ }
+ }
}
latch.await();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index a4d3ac97758..8f8b359797e 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -113,6 +113,7 @@ private:
bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
Task::UP next_task(Worker &worker);
void worker_main();
+ void run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock);
public:
AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
size_t max_waiting, size_t max_pending);