blob: 8b9453ab3f3f8576643de5dcce488f1741091239 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include <vespa/vespalib/util/time.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
namespace storage::distributor {
class DistributorStripe;
class DistributorStripePool;
class TickableStripe;
/**
* A DistributorStripeThread provides threading resources for a single distributor stripe
* and the means of synchronizing access towards it through a DistributorStripePool.
*
* A DistributorStripeThread instance is bidirectionally bound to a particular pool and
* should therefore always be created by the pool itself (never standalone).
*/
class DistributorStripeThread {
using AtomicDuration = std::atomic<vespalib::duration>;
TickableStripe& _stripe;
DistributorStripePool& _stripe_pool;
AtomicDuration _tick_wait_duration;
std::mutex _mutex;
std::condition_variable _event_cond;
std::condition_variable _park_cond;
std::atomic<uint32_t> _ticks_before_wait;
std::atomic<bool> _should_park;
std::atomic<bool> _should_stop;
bool _waiting_for_event;
friend class DistributorStripePool;
public:
DistributorStripeThread(TickableStripe& stripe,
DistributorStripePool& stripe_pool);
~DistributorStripeThread();
void run();
// Wakes up stripe thread if it's currently waiting for an external event to be triggered,
// such as the arrival of a new RPC message. If thread is parked this call will have no
// effect.
void notify_event_has_triggered() noexcept;
void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;
TickableStripe* operator->() noexcept { return &_stripe; }
const TickableStripe* operator->() const noexcept { return &_stripe; }
TickableStripe& stripe() noexcept { return _stripe; }
const TickableStripe& stripe() const noexcept { return _stripe; }
private:
[[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
return _should_stop.load(std::memory_order_relaxed);
}
[[nodiscard]] bool should_park_relaxed() const noexcept {
return _should_park.load(std::memory_order_relaxed);
}
[[nodiscard]] vespalib::duration tick_wait_duration_relaxed() const noexcept {
return _tick_wait_duration.load(std::memory_order_relaxed);
}
[[nodiscard]] uint32_t ticks_before_wait_relaxed() const noexcept {
return _ticks_before_wait.load(std::memory_order_relaxed);
}
void signal_wants_park() noexcept;
void unpark_thread() noexcept;
void wait_until_event_notified_or_timed_out() noexcept;
void wait_until_unparked() noexcept;
void signal_should_stop() noexcept;
};
}
|