aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
blob: 8e9c1ae3fa15ea09cb5389874df4644013fff23e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
#include <thread>
#include <atomic>

namespace vespalib {

/**
 * Has a single thread consuming tasks from a fixed size ringbuffer.
 * Made for throughput where the producer has no interaction with the consumer and
 * it is hence very cheap to produce a task. High and low watermark at 25%/75% is used
 * to reduce ping-pong.
 */
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
    explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
    SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
    ~SingleExecutor() override;
    Task::UP execute(Task::UP task) override;
    void setTaskLimit(uint32_t taskLimit) override;
    SingleExecutor & sync() override;
    void wakeup() override;
    size_t getNumThreads() const override;
    uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); }
    uint32_t get_watermark() const { return _watermark; }
    duration get_reaction_time() const { return _reactionTime; }
    ExecutorStats getStats() override;
    SingleExecutor & shutdown() override;
private:
    using Lock = std::unique_lock<std::mutex>;
    void drain(Lock & lock);
    void run() override;
    void drain_tasks();
    void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
    void run_tasks_till(uint64_t available);
    void wait_for_room(Lock & guard);
    uint64_t index(uint64_t counter) const {
        return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
    }

    uint64_t numTasks() const {
        return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
    }
    std::atomic<uint32_t>       _taskLimit;
    std::atomic<uint32_t>       _wantedTaskLimit;
    std::atomic<uint64_t>       _rp;
    std::unique_ptr<Task::UP[]> _tasks;
    std::mutex                  _mutex;
    std::condition_variable     _consumerCondition;
    std::condition_variable     _producerCondition;
    vespalib::Thread            _thread;
    uint64_t                    _lastAccepted;
    ExecutorStats::QueueSizeT   _queueSize;
    std::atomic<uint64_t>       _wakeupConsumerAt;
    std::atomic<uint64_t>       _producerNeedWakeupAt;
    std::atomic<uint64_t>       _wp;
    const uint32_t              _watermark;
    const duration              _reactionTime;
    bool                        _closed;
};

}