1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "reactor.h"
#include <cassert>
namespace vespalib::portal {
Reactor::EventHandler::~EventHandler() = default;
//-----------------------------------------------------------------------------
Reactor::Token::Token(Reactor &reactor, EventHandler &handler, int fd, bool read, bool write)
: _reactor(reactor), _handler(handler), _fd(fd)
{
++_reactor._token_cnt;
_reactor._selector.add(_fd, _handler, read, write);
}
void
Reactor::Token::update(bool read, bool write)
{
_reactor._selector.update(_fd, _handler, read, write);
}
Reactor::Token::~Token()
{
_reactor._selector.remove(_fd);
_reactor.cancel_token(*this);
--_reactor._token_cnt;
}
//-----------------------------------------------------------------------------
void
Reactor::cancel_token(const Token &)
{
if (std::this_thread::get_id() == _thread.get_id()) {
_skip_events = true;
} else {
std::unique_lock guard(_lock);
size_t old_gen = _sync_seq;
++_wait_cnt;
guard.unlock(); // UNLOCK
_selector.wakeup();
guard.lock(); // LOCK
while (_sync_seq == old_gen) {
_cond.wait(guard);
}
--_wait_cnt;
}
}
void
Reactor::release_tokens()
{
std::lock_guard guard(_lock);
if (_wait_cnt > 0) {
++_sync_seq;
_cond.notify_all();
}
}
//-----------------------------------------------------------------------------
void
Reactor::handle_wakeup()
{
_was_woken = true;
}
void
Reactor::handle_event(EventHandler &handler, bool read, bool write)
{
if (!_skip_events) {
handler.handle_event(read, write);
}
}
void
Reactor::event_loop()
{
while (!_done) {
_selector.poll(_tick());
_selector.dispatch(*this);
if (_skip_events) {
_skip_events = false;
}
if (_was_woken) {
release_tokens();
_was_woken = false;
}
}
}
//-----------------------------------------------------------------------------
Reactor::Reactor(std::function<int()> tick)
: _selector(),
_tick(std::move(tick)),
_done(false),
_was_woken(false),
_skip_events(false),
_lock(),
_cond(),
_sync_seq(0),
_wait_cnt(0),
_token_cnt(0),
_thread(&Reactor::event_loop, this)
{
}
Reactor::~Reactor()
{
assert(_token_cnt == 0);
_done = true;
_selector.wakeup();
_thread.join();
}
Reactor::Token::UP
Reactor::attach(EventHandler &handler, int fd, bool read, bool write)
{
return Token::UP(new Token(*this, handler, fd, read, write));
}
//-----------------------------------------------------------------------------
} // namespace vespalib::portal
|