aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
blob: 230ff922e80fa8da321e00268f3f89caa4eeae14 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "scheduled_forward_executor.h"
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <thread>
#include <condition_variable>
#include <cassert>

using vespalib::makeLambdaTask;

namespace proton {

class ScheduledForwardExecutor::State {
public:
    State() :
        _mutex(),
        _cond(),
        _handle(),
        _start_success(0),
        _start_failed(0),
        _running(false)
    {}
    ~State() {
        std::lock_guard guard(_mutex);
        assert( !_handle );
        assert( ! _running);
    }
    /// Returns false if it was already running
    bool start() {
        std::lock_guard guard(_mutex);
        bool already_running = _running;
        _running = true;
        if (already_running) {
            _start_failed++;
        } else {
            _start_success++;
        }
        _cond.notify_all();
        return ! already_running;
    }
    void complete() {
        std::lock_guard guard(_mutex);
        bool was_running = _running;
        _running = false;
        assert(was_running);
        _cond.notify_all();
    }
    void setHandle(Handle handle) {
        std::lock_guard guard(_mutex);
        _handle = std::move(handle);
    }
    void cancel() {
        std::unique_lock guard(_mutex);
        _handle.reset();
        while(_running) {
            _cond.wait(guard);
        }
    }
private:
    std::mutex              _mutex;
    std::condition_variable _cond;
    Handle                  _handle;
    uint64_t                _start_success;
    uint64_t                _start_failed;
    bool                    _running;
};

class ScheduledForwardExecutor::Registration : public vespalib::IDestructorCallback {
private:
    ScheduledForwardExecutor & _executor;
    uint64_t                   _key;
public:
    Registration(ScheduledForwardExecutor & executor, uint64_t key) : _executor(executor), _key(key) {}
    ~Registration() {
        _executor.cancel(_key);
    }
};

ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport,
                                                   Executor& executor)
    : _scheduler(transport),
      _executor(executor),
      _lock(),
      _nextKey(0),
      _taskList()
{
}

ScheduledForwardExecutor::~ScheduledForwardExecutor() {
    std::lock_guard guard(_lock);
    assert(_taskList.empty());
}

bool
ScheduledForwardExecutor::cancel(uint64_t key)
{
    std::unique_ptr<State> state;
    {
        std::lock_guard guard(_lock);
        auto found = _taskList.find(key);
        if (found == _taskList.end()) return false;
        state = std::move(found->second);
        _taskList.erase(found);
    }
    state->cancel();
    return true;
}

IScheduledExecutor::Handle
ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task,
                                              duration delay, duration interval)
{
    std::lock_guard guard(_lock);
    uint64_t key = _nextKey++;
    auto state = std::make_unique<State>();
    std::shared_ptr<Executor::Task> my_task = std::move(task);
    auto handle = _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task), my_state=state.get()]() {
        bool start_allowed = my_state->start();
        if (start_allowed) {
            _executor.execute(makeLambdaTask([my_state, my_task]() {
                my_task->run();
                my_state->complete();
            }));
        }
    }), delay, interval);
    state->setHandle(std::move(handle));
    _taskList[key] = std::move(state);
    return std::make_unique<Registration>(*this, key);
}

}