summaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
blob: f19ff36dbfb123fe3a6fb982bbafef9cb31cf63c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "scheduled_forward_executor.h"
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <thread>
#include <condition_variable>
#include <cassert>

using vespalib::makeLambdaTask;

namespace proton {

class ScheduledForwardExecutor::State {
public:
    State() :
        _mutex(),
        _cond(),
        _handle(),
        _start_success(0),
        _start_failed(0),
        _running(false)
    {}
    ~State() {
        std::lock_guard guard(_mutex);
        assert( !_handle );
        assert( ! _running);
    }
    /// Returns false if it was already running
    bool start() {
        std::lock_guard guard(_mutex);
        bool already_running = _running;
        _running = true;
        if (already_running) {
            _start_failed++;
        } else {
            _start_success++;
        }
        _cond.notify_all();
        return ! already_running;
    }
    void complete() {
        std::lock_guard guard(_mutex);
        bool was_running = _running;
        _running = false;
        assert(was_running);
        _cond.notify_all();
    }
    void setHandle(Handle handle) {
        std::lock_guard guard(_mutex);
        _handle = std::move(handle);
    }
    void cancel() {
        std::unique_lock guard(_mutex);
        _handle.reset();
        while(_running) {
            _cond.wait(guard);
        }
    }
private:
    std::mutex              _mutex;
    std::condition_variable _cond;
    Handle                  _handle;
    uint64_t                _start_success;
    uint64_t                _start_failed;
    bool                    _running;
};

class ScheduledForwardExecutor::Registration : public vespalib::IDestructorCallback {
private:
    ScheduledForwardExecutor & _executor;
    uint64_t                   _key;
public:
    Registration(ScheduledForwardExecutor & executor, uint64_t key) : _executor(executor), _key(key) {}
    ~Registration() {
        _executor.cancel(_key);
    }
};

ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport,
                                                   Executor& executor)
    : _scheduler(transport),
      _executor(executor),
      _lock(),
      _nextKey(0),
      _taskList()
{
}

ScheduledForwardExecutor::~ScheduledForwardExecutor() {
    std::lock_guard guard(_lock);
    assert(_taskList.empty());
}

bool
ScheduledForwardExecutor::cancel(uint64_t key)
{
    std::lock_guard guard(_lock);
    auto found = _taskList.find(key);
    if (found == _taskList.end()) return false;
    found->second->cancel();
    _taskList.erase(found);
    return true;
}

IScheduledExecutor::Handle
ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task,
                                              duration delay, duration interval)
{
    std::lock_guard guard(_lock);
    uint64_t key = _nextKey++;
    auto state = std::make_unique<State>();
    std::shared_ptr<Executor::Task> my_task = std::move(task);
    auto handle = _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task), my_state=state.get()]() {
        bool start_allowed = my_state->start();
        if (start_allowed) {
            _executor.execute(makeLambdaTask([my_state, my_task]() {
                my_task->run();
                my_state->complete();
            }));
        }
    }), delay, interval);
    state->setHandle(std::move(handle));
    _taskList[key] = std::move(state);
    return std::make_unique<Registration>(*this, key);
}

}