summaryrefslogtreecommitdiffstats
path: root/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
blob: 59cc9a39957cd1df807d93e14df4818aee63b138 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "singleexecutor.h"
#include <vespa/vespalib/util/time.h>

namespace vespalib {

SingleExecutor::SingleExecutor(uint32_t taskLimit)
    : _taskLimit(vespalib::roundUp2inN(taskLimit)),
      _wantedTaskLimit(_taskLimit.load()),
      _rp(0),
      _tasks(std::make_unique<Task::UP[]>(_taskLimit)),
      _monitor(),
      _thread(*this),
      _lastAccepted(0),
      _maxPending(0),
      _wakeupConsumerAt(0),
      _producerNeedWakeup(false),
      _wp(0)
{
    _thread.start();
}
SingleExecutor::~SingleExecutor() {
    sync();
    _thread.stop().join();
}

size_t
SingleExecutor::getNumThreads() const {
    return 1;
}

Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
    wait_for_room();
    uint64_t wp = _wp.load(std::memory_order_relaxed);
    _tasks[index(wp)] = std::move(task);
    _wp.store(wp + 1, std::memory_order_relaxed);
    if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
        MonitorGuard guard(_monitor);
        guard.signal();
    }
    return Task::UP();
}

void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
    _wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
}

SingleExecutor &
SingleExecutor::sync() {
    uint64_t wp = _wp.load(std::memory_order_relaxed);
    while (wp > _rp.load(std::memory_order_relaxed)) {
        std::this_thread::sleep_for(1ms);
    }
    return *this;
}

void
SingleExecutor::run() {
    while (!_thread.stopped()) {
        drain_tasks();
        _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
        MonitorGuard guard(_monitor);
        guard.wait(1ms);
        _wakeupConsumerAt.store(0, std::memory_order_relaxed);
    }
}

void
SingleExecutor::drain_tasks() {
    while (numTasks() > 0) {
        run_tasks_till(_wp.load(std::memory_order_relaxed));
    }
}

void
SingleExecutor::run_tasks_till(uint64_t available) {
    uint64_t consumed = _rp.load(std::memory_order_relaxed);
    uint64_t left = available - consumed;
    if (_maxPending.load(std::memory_order_relaxed) < left) {
        _maxPending.store(left, std::memory_order_relaxed);
    }
    uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed)
            ? (available - (left >> 2))
            : 0;
    while (consumed  < available) {
        Task::UP task = std::move(_tasks[index(consumed)]);
        task->run();
        _rp.store(++consumed, std::memory_order_relaxed);
        if (wakeupLimit == consumed) {
            MonitorGuard guard(_monitor);
            guard.signal();
        }
    }
}

void
SingleExecutor::wait_for_room() {
    if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) {
        sync();
        _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
        _taskLimit = _wantedTaskLimit.load();
    }
    while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
        _producerNeedWakeup.store(true, std::memory_order_relaxed);
        MonitorGuard guard(_monitor);
        guard.wait(1ms);
        _producerNeedWakeup.store(false, std::memory_order_relaxed);
    }
}

ThreadExecutor::Stats
SingleExecutor::getStats() {
    uint64_t accepted = _wp.load(std::memory_order_relaxed);
    Stats stats(_maxPending, (accepted - _lastAccepted), 0);
    _lastAccepted = accepted;
    _maxPending = 0;
    return stats;
}


}