aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib/src
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2020-07-02 12:45:57 +0000
committerHåvard Pettersen <havardpe@oath.com>2020-07-02 12:45:57 +0000
commitbf3726b1ba88cbe9987fea567e78de06f1bc2559 (patch)
tree1049fde75b8e68416a85de6c0b3fbdc26ac356b2 /staging_vespalib/src
parent1e1b35fa0b2589994f8d01fd8676659daf94791b (diff)
use event barrier for sync
Diffstat (limited to 'staging_vespalib/src')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp85
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h32
2 files changed, 68 insertions, 49 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 765d7b6c9f5..8139fd6d11b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -1,7 +1,6 @@
// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "adaptive_sequenced_executor.h"
-#include <vespa/vespalib/util/lambdatask.h>
namespace vespalib {
@@ -180,12 +179,15 @@ AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock<std:
return true;
}
-AdaptiveSequencedExecutor::Task::UP
-AdaptiveSequencedExecutor::next_task(Worker &worker)
+AdaptiveSequencedExecutor::TaggedTask
+AdaptiveSequencedExecutor::next_task(Worker &worker, std::optional<uint32_t> prev_token)
{
- Task::UP task;
+ TaggedTask task;
Worker *worker_to_wake = nullptr;
auto guard = std::unique_lock(_mutex);
+ if (prev_token.has_value()) {
+ _barrier.completeEvent(prev_token.value());
+ }
if (exchange_strand(worker, guard)) {
assert(worker.state == Worker::State::RUNNING);
assert(worker.strand != nullptr);
@@ -213,41 +215,14 @@ void
AdaptiveSequencedExecutor::worker_main()
{
Worker worker;
- while (Task::UP my_task = next_task(worker)) {
- my_task->run();
+ std::optional<uint32_t> prev_token = std::nullopt;
+ while (TaggedTask my_task = next_task(worker, prev_token)) {
+ my_task.task->run();
+ prev_token = my_task.token;
}
_thread_tools->allow_worker_exit.await();
}
-void
-AdaptiveSequencedExecutor::run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock)
-{
- assert(_self.state != Self::State::CLOSED);
- strand.queue.push(std::move(task));
- _stats.queueSize.add(++_self.pending_tasks);
- ++_stats.acceptedTasks;
- if (strand.state == Strand::State::WAITING) {
- ++_self.waiting_tasks;
- } else if (strand.state == Strand::State::IDLE) {
- if (_worker_stack.size() < _cfg.num_threads) {
- strand.state = Strand::State::WAITING;
- _wait_queue.push(&strand);
- _self.waiting_tasks += strand.queue.size();
- } else {
- strand.state = Strand::State::ACTIVE;
- assert(_wait_queue.empty());
- Worker *worker = _worker_stack.back();
- _worker_stack.popBack();
- assert(worker->state == Worker::State::BLOCKED);
- assert(worker->strand == nullptr);
- worker->state = Worker::State::RUNNING;
- worker->strand = &strand;
- lock.unlock(); // UNLOCK
- worker->cond.notify_one();
- }
- }
-}
-
AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
size_t max_waiting, size_t max_pending)
: ISequencedTaskExecutor(num_strands),
@@ -297,26 +272,44 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
assert(id.getId() < _strands.size());
Strand &strand = _strands[id.getId()];
auto guard = std::unique_lock(_mutex);
+ assert(_self.state != Self::State::CLOSED);
maybe_block_self(guard);
- run_task_in_strand(std::move(task), strand, guard);
+ strand.queue.push(TaggedTask(std::move(task), _barrier.startEvent()));
+ _stats.queueSize.add(++_self.pending_tasks);
+ ++_stats.acceptedTasks;
+ if (strand.state == Strand::State::WAITING) {
+ ++_self.waiting_tasks;
+ } else if (strand.state == Strand::State::IDLE) {
+ if (_worker_stack.size() < _cfg.num_threads) {
+ strand.state = Strand::State::WAITING;
+ _wait_queue.push(&strand);
+ _self.waiting_tasks += strand.queue.size();
+ } else {
+ strand.state = Strand::State::ACTIVE;
+ assert(_wait_queue.empty());
+ Worker *worker = _worker_stack.back();
+ _worker_stack.popBack();
+ assert(worker->state == Worker::State::BLOCKED);
+ assert(worker->strand == nullptr);
+ worker->state = Worker::State::RUNNING;
+ worker->strand = &strand;
+ guard.unlock(); // UNLOCK
+ worker->cond.notify_one();
+ }
+ }
}
void
AdaptiveSequencedExecutor::sync()
{
- vespalib::CountDownLatch latch(_strands.size());
+ BarrierCompletion barrierCompletion;
{
- auto guard = std::unique_lock(_mutex);
- for (Strand &strand: _strands) {
- if (strand.state == Strand::State::IDLE) {
- latch.countDown();
- } else {
- Task::UP task = vespalib::makeLambdaTask([&](){ latch.countDown(); });
- run_task_in_strand(std::move(task), strand, guard);
- }
+ auto guard = std::scoped_lock(_mutex);
+ if (!_barrier.startBarrier(barrierCompletion)) {
+ return;
}
}
- latch.await();
+ barrierCompletion.gate.await();
}
void
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index 8f8b359797e..c52b9b22245 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -5,9 +5,12 @@
#include "isequencedtaskexecutor.h"
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/gate.h>
+#include <vespa/vespalib/util/eventbarrier.hpp>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/fastos/thread.h>
#include <mutex>
#include <condition_variable>
+#include <optional>
#include <cassert>
namespace vespalib {
@@ -23,6 +26,24 @@ private:
using Stats = vespalib::ExecutorStats;
using Task = vespalib::Executor::Task;
+ struct TaggedTask {
+ Task::UP task;
+ uint32_t token;
+ TaggedTask() : task(nullptr), token(0) {}
+ TaggedTask(Task::UP task_in, uint32_t token_in)
+ : task(std::move(task_in)), token(token_in) {}
+ TaggedTask(TaggedTask &&rhs) = default;
+ TaggedTask(const TaggedTask &rhs) = delete;
+ TaggedTask &operator=(const TaggedTask &rhs) = delete;
+ TaggedTask &operator=(TaggedTask &&rhs) {
+ assert(task.get() == nullptr); // no overwrites
+ task = std::move(rhs.task);
+ token = rhs.token;
+ return *this;
+ }
+ operator bool() const { return bool(task); }
+ };
+
/**
* Values used to configure the executor.
**/
@@ -51,7 +72,7 @@ private:
struct Strand {
enum class State { IDLE, WAITING, ACTIVE };
State state;
- vespalib::ArrayQueue<Task::UP> queue;
+ vespalib::ArrayQueue<TaggedTask> queue;
Strand();
~Strand();
};
@@ -96,11 +117,17 @@ private:
void close();
};
+ struct BarrierCompletion {
+ Gate gate;
+ void completeBarrier() { gate.countDown(); }
+ };
+
std::unique_ptr<ThreadTools> _thread_tools;
std::mutex _mutex;
std::vector<Strand> _strands;
vespalib::ArrayQueue<Strand*> _wait_queue;
vespalib::ArrayQueue<Worker*> _worker_stack;
+ EventBarrier<BarrierCompletion> _barrier;
Self _self;
Stats _stats;
Config _cfg;
@@ -111,9 +138,8 @@ private:
Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock);
bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
- Task::UP next_task(Worker &worker);
+ TaggedTask next_task(Worker &worker, std::optional<uint32_t> prev_token);
void worker_main();
- void run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock);
public:
AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
size_t max_waiting, size_t max_pending);