aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/vespa/vespalib/portal/reactor.cpp
blob: a2ef30b60d8ab5323e495aaca925fbd393073b17 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "reactor.h"
#include <cassert>

namespace vespalib::portal {

Reactor::EventHandler::~EventHandler() = default;

//-----------------------------------------------------------------------------

Reactor::Token::Token(Reactor &reactor, EventHandler &handler, int fd, bool read, bool write)
    : _reactor(reactor), _handler(handler), _fd(fd)
{
    ++_reactor._token_cnt;
    _reactor._selector.add(_fd, _handler, read, write);
}

void
Reactor::Token::update(bool read, bool write)
{
    _reactor._selector.update(_fd, _handler, read, write);
}

Reactor::Token::~Token()
{
    _reactor._selector.remove(_fd);
    _reactor.cancel_token(*this);
    --_reactor._token_cnt;
}

//-----------------------------------------------------------------------------

void
Reactor::cancel_token(const Token &)
{
    if (std::this_thread::get_id() == _thread.get_id()) {
        _skip_events = true;
    } else {
        std::unique_lock guard(_lock);
        size_t old_gen = _sync_seq;
        ++_wait_cnt;
        guard.unlock(); // UNLOCK
        _selector.wakeup();
        guard.lock(); // LOCK
        while (_sync_seq == old_gen) {
            _cond.wait(guard);
        }
        --_wait_cnt;
    }
}

void
Reactor::release_tokens()
{
    std::lock_guard guard(_lock);
    if (_wait_cnt > 0) {
        ++_sync_seq;
        _cond.notify_all();
    }
}

//-----------------------------------------------------------------------------

void
Reactor::handle_wakeup()
{
    _was_woken = true;
}

void
Reactor::handle_event(EventHandler &handler, bool read, bool write)
{
    if (!_skip_events) {
        handler.handle_event(read, write);
    }
}

void
Reactor::event_loop()
{
    while (!_done) {
        _selector.poll(_tick());
        _selector.dispatch(*this);
        if (_skip_events) {
            _skip_events = false;
        }
        if (_was_woken) {
            release_tokens();
            _was_woken = false;
        }
    }
}

//-----------------------------------------------------------------------------

Reactor::Reactor(std::function<int()> tick)
    : _selector(),
      _tick(std::move(tick)),
      _done(false),
      _was_woken(false),
      _skip_events(false),
      _lock(),
      _cond(),
      _sync_seq(0),
      _wait_cnt(0),
      _token_cnt(0),
      _thread(&Reactor::event_loop, this)
{
}

Reactor::~Reactor()
{
    assert(_token_cnt == 0);
    _done = true;
    _selector.wakeup();
    _thread.join();
}

Reactor::Token::UP
Reactor::attach(EventHandler &handler, int fd, bool read, bool write)
{
    return Token::UP(new Token(*this, handler, fd, read, write));
}

//-----------------------------------------------------------------------------

} // namespace vespalib::portal