aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
blob: 58290620b38747a1120c94f3b367f96597f06892 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include <vespa/vespalib/util/time.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>

namespace storage::distributor {

class DistributorStripe;
class DistributorStripePool;
class TickableStripe;

/**
 * A DistributorStripeThread provides threading resources for a single distributor stripe
 * and the means of synchronizing access towards it through a DistributorStripePool.
 *
 * A DistributorStripeThread instance is bidirectionally bound to a particular pool and
 * should therefore always be created by the pool itself (never standalone).
 */
class DistributorStripeThread {
    using AtomicDuration = std::atomic<vespalib::duration>;

    TickableStripe&         _stripe;
    DistributorStripePool&  _stripe_pool;
    AtomicDuration          _tick_wait_duration;
    std::mutex              _mutex;
    std::condition_variable _event_cond;
    std::condition_variable _park_cond;
    std::atomic<uint32_t>   _ticks_before_wait;
    std::atomic<bool>       _should_park;
    std::atomic<bool>       _should_stop;
    bool                    _waiting_for_event;

    friend class DistributorStripePool;
public:
    DistributorStripeThread(TickableStripe& stripe,
                            DistributorStripePool& stripe_pool);
    ~DistributorStripeThread();

    void run();

    // Wakes up stripe thread if it's currently waiting for an external event to be triggered,
    // such as the arrival of a new RPC message. If thread is parked this call will have no
    // effect.
    void notify_event_has_triggered() noexcept;

    void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
    void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;

    TickableStripe*       operator->() noexcept       { return &_stripe; }
    const TickableStripe* operator->() const noexcept { return &_stripe; }

    TickableStripe& stripe() noexcept             { return _stripe; }
    const TickableStripe& stripe() const noexcept { return _stripe; }
private:
    [[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
        return _should_stop.load(std::memory_order_relaxed);
    }

    [[nodiscard]] bool should_park_relaxed() const noexcept {
        return _should_park.load(std::memory_order_relaxed);
    }

    [[nodiscard]] vespalib::duration tick_wait_duration_relaxed() const noexcept {
        return _tick_wait_duration.load(std::memory_order_relaxed);
    }

    [[nodiscard]] uint32_t ticks_before_wait_relaxed() const noexcept {
        return _ticks_before_wait.load(std::memory_order_relaxed);
    }

    void signal_wants_park() noexcept;
    void unpark_thread() noexcept;
    void wait_until_event_notified_or_timed_out() noexcept;
    void wait_until_unparked() noexcept;

    void signal_should_stop() noexcept;
};

}