blob: 8e9c1ae3fa15ea09cb5389874df4644013fff23e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
#include <thread>
#include <atomic>
namespace vespalib {
/**
* Has a single thread consuming tasks from a fixed size ringbuffer.
* Made for throughput where the producer has no interaction with the consumer and
* it is hence very cheap to produce a task. High and low watermark at 25%/75% is used
* to reduce ping-pong.
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
SingleExecutor & sync() override;
void wakeup() override;
size_t getNumThreads() const override;
uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); }
uint32_t get_watermark() const { return _watermark; }
duration get_reaction_time() const { return _reactionTime; }
ExecutorStats getStats() override;
SingleExecutor & shutdown() override;
private:
using Lock = std::unique_lock<std::mutex>;
void drain(Lock & lock);
void run() override;
void drain_tasks();
void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
void wait_for_room(Lock & guard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
uint64_t numTasks() const {
return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
}
std::atomic<uint32_t> _taskLimit;
std::atomic<uint32_t> _wantedTaskLimit;
std::atomic<uint64_t> _rp;
std::unique_ptr<Task::UP[]> _tasks;
std::mutex _mutex;
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
uint64_t _lastAccepted;
ExecutorStats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
const uint32_t _watermark;
const duration _reactionTime;
bool _closed;
};
}
|