aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/statemanager.h
blob: 72b89dc4d65cfcb3a127c4be8bd0f2d24c98cfae (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class storage::StateManager
 * @ingroup storageserver
 *
 * @brief Keeps and updates node and system states.
 *
 * This component implements the NodeStateUpdater interface to handle states
 * for all components. See that interface for documentation.
 *
 * In addition, this manager is a storage link such that it can handle the
 * various commands for setting and retrieving states.
 */
#pragma once

#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/objects/floatingpointtype.h>
#include <deque>
#include <map>
#include <unordered_set>
#include <list>
#include <atomic>

namespace metrics {
    class MetricManager;
}

namespace storage {

namespace lib { class ClusterStateBundle; }

class StateManager : public NodeStateUpdater,
                     public StorageLink,
                     public framework::HtmlStatusReporter,
                     private framework::Runnable,
                     private vespalib::JsonStreamTypes
{
    using ClusterStateBundle = lib::ClusterStateBundle;
    using TimeStateCmdPair   = std::pair<vespalib::steady_time, api::GetNodeStateCommand::SP>;
    using TimeSysStatePair   = std::pair<vespalib::steady_time, std::shared_ptr<const ClusterStateBundle>>;

    struct StateManagerMetrics;

    StorageComponent                          _component;
    const NodeStateReporter &                 _nodeStateReporter;
    std::unique_ptr<StateManagerMetrics>      _metrics;
    mutable std::mutex                        _stateLock;
    std::condition_variable                   _stateCond;
    std::mutex                                _listenerLock;
    std::shared_ptr<lib::NodeState>           _nodeState;
    std::shared_ptr<lib::NodeState>           _nextNodeState;
    std::shared_ptr<const ClusterStateBundle> _systemState;
    std::shared_ptr<const ClusterStateBundle> _nextSystemState;
    uint32_t                                  _reported_host_info_cluster_state_version;
    std::list<StateListener*>                 _stateListeners;
    std::list<TimeStateCmdPair>               _queuedStateRequests;
    mutable std::mutex                        _threadLock;
    std::condition_variable                   _threadCond;
    std::deque<TimeSysStatePair>              _systemStateHistory;
    uint32_t                                  _systemStateHistorySize;
    const vespalib::steady_time               _start_time;
    std::optional<vespalib::steady_time>      _health_ping_time;
    vespalib::duration                        _health_ping_warn_interval;
    vespalib::steady_time                     _health_ping_warn_time;
    std::unique_ptr<HostInfo>                 _hostInfo;
    std::unique_ptr<framework::Thread>        _thread;
    // Controllers that have observed a GetNodeState response sent _after_
    // immediately_send_get_node_state_replies() has been invoked.
    std::unordered_set<uint16_t>              _controllers_observed_explicit_node_state;
    bool                                      _noThreadTestMode;
    bool                                      _grabbedExternalLock;
    std::atomic<bool>                         _notifyingListeners;
    std::atomic<bool>                         _requested_almost_immediate_node_state_replies;

public:
    explicit StateManager(StorageComponentRegister&, std::unique_ptr<HostInfo>,
                          const NodeStateReporter & reporter, bool testMode);
    ~StateManager() override;

    void onOpen() override;
    void onClose() override;

    void tick();
    void warn_on_missing_health_ping();

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;
    void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;

    lib::NodeState::CSP getReportedNodeState() const override;
    lib::NodeState::CSP getCurrentNodeState() const override;
    std::shared_ptr<const ClusterStateBundle> getClusterStateBundle() const override;

    void addStateListener(StateListener&) override;
    void removeStateListener(StateListener&) override;

    Lock::SP grabStateChangeLock() override;
    void setReportedNodeState(const lib::NodeState& state) override;
    void setClusterStateBundle(const ClusterStateBundle& c);
    HostInfo& getHostInfo() { return *_hostInfo; }

    void immediately_send_get_node_state_replies() override;
    void request_almost_immediate_node_state_replies() override;

private:
    struct ExternalStateLock;
    friend struct StateManagerTest;

    void notifyStateListeners();
    bool sendGetNodeStateReplies();
    bool sendGetNodeStateReplies(vespalib::steady_time olderThanTime);
    bool sendGetNodeStateReplies(uint16_t nodeIndex);
    bool sendGetNodeStateReplies(vespalib::steady_time olderThanTime, uint16_t nodeIndex);
    void mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex> &, uint16_t controller_index);

    lib::Node thisNode() const;

    /**
     * Overwrite the current cluster state with the one that is currently
     * pending.
     *
     * Appends the pending cluster state to a circular buffer of historic
     * states.
     *
     * Preconditions:
     *   - _stateLock is held
     *   - _systemState.get() != nullptr
     *   - _nextSystemState.get() != nullptr
     * Postconditions:
     *   - _systemState = old(_nextSystemState)
     *   - _nextSystemState.get() == nullptr
     */
    void enableNextClusterState();

    /**
     * Log this node's state transition as given by the cluster state iff the
     * state differs between currentState and newState.
     */
    void logNodeClusterStateTransition(
            const ClusterStateBundle& currentState,
            const ClusterStateBundle& newState) const;

    bool onGetNodeState(const std::shared_ptr<api::GetNodeStateCommand>&) override;
    bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
    bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>&) override;

    /**
     * _stateLock MUST NOT be held while calling.
     */
    std::string getNodeInfo() const;

    void run(framework::ThreadHandle&) override;

    void clear_controllers_observed_explicit_node_state_vector();
};

} // storage