aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/pendingmessagetracker.h
blob: 736f2918401d8bf0380d7178a489865c8574dabf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "nodeinfo.h"
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageframework/generic/component/component.h>
#include <vespa/storageframework/generic/component/componentregister.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <boost/multi_index/composite_key.hpp>
#include <boost/multi_index/identity.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <chrono>
#include <functional>
#include <mutex>
#include <set>
#include <unordered_map>

namespace storage::distributor {

/**
 * Since the state a deferred task depends on may have changed between the
 * time a task was scheduled and when it's actually executed, this enum provides
 * a means of communicating if a task should be started as normal.
 */
enum class TaskRunState {
    OK,        // Task may be started as normal
    Aborted,   // Task should trigger an immediate abort behavior (distributor is shutting down)
    BucketLost // Task should trigger an immediate abort behavior (bucket no longer present on node)
};

/**
 * Represents an arbitrary task whose execution may be deferred until no
 * further pending operations are present.
 */
struct DeferredTask {
    virtual ~DeferredTask() = default;
    virtual void run(TaskRunState state) = 0;
};

template <typename Func>
class LambdaDeferredTask : public DeferredTask {
    Func _func;
public:
    explicit LambdaDeferredTask(Func&& f) : _func(std::move(f)) {}
    LambdaDeferredTask(const LambdaDeferredTask&) = delete;
    LambdaDeferredTask(LambdaDeferredTask&&) = delete;
    ~LambdaDeferredTask() override = default;

    void run(TaskRunState state) override {
        _func(state);
    }
};

template <typename Func>
std::unique_ptr<DeferredTask> make_deferred_task(Func&& f) {
    return std::make_unique<LambdaDeferredTask<std::decay_t<Func>>>(std::forward<Func>(f));
}

class PendingMessageTracker : public framework::HtmlStatusReporter {
public:
    class Checker {
    public:
        virtual ~Checker() = default;
        virtual bool check(uint32_t messageType, uint16_t node, uint8_t priority) = 0;
    };

    using TimePoint = vespalib::system_time;

    PendingMessageTracker(framework::ComponentRegister&, uint32_t stripe_index);
    ~PendingMessageTracker() override;

    void insert(const std::shared_ptr<api::StorageMessage>&);
    document::Bucket reply(const api::StorageReply& reply);
    void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;

    void print(std::ostream& out, bool verbose, const std::string& indent) const;

    /**
     * Goes through each pending message for the given node+bucket pair,
     * passing it to the given type checker.
     * Breaks when the checker returns false.
     */
    void checkPendingMessages(uint16_t node, const document::Bucket& bucket, Checker& checker) const;

    /**
     * Goes through each pending message (across all nodes) for the given bucket
     * and invokes the given checker with the node, message type and priority.
     * Breaks when the checker returns false.
     */
    void checkPendingMessages(const document::Bucket& bucket, Checker& checker) const;

    /**
     * Utility function for checking if there's a message of type
     * messageType pending to bucket bid on the given node.
     */
    bool hasPendingMessage(uint16_t node, const document::Bucket& bucket, uint32_t messageType) const;

    /**
     * Returns a vector containing the number of pending messages to each storage node.
     * The vector might be smaller than a given node index. In that case, that storage
     * node has never had any pending messages.
     */
    const NodeInfo& getNodeInfo() const noexcept { return _nodeInfo; }
    NodeInfo& getNodeInfo() noexcept { return _nodeInfo; }

    /**
     * Clears all pending messages for the given node, and returns
     * the messages erased.
     */
    std::vector<uint64_t> clearMessagesForNode(uint16_t node);

    void setNodeBusyDuration(vespalib::duration duration) noexcept {
        _nodeBusyDuration = duration;
    }

    void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task);
    void abort_deferred_tasks();

    /**
     * For each distinct bucket with at least one pending message towards it:
     *
     * Iff `bucket_predicate(bucket) == true`, `msg_id_callback` is invoked once for _each_
     * message towards `bucket`, with the message ID as the argument.
     *
     * Note: `bucket_predicate` is only invoked once per distinct bucket.
     */
    void enumerate_matching_pending_bucket_ops(const std::function<bool(const document::Bucket&)>& bucket_predicate,
                                               const std::function<void(uint64_t)>& msg_id_callback) const;
private:
    struct MessageEntry {
        TimePoint        timeStamp;
        uint32_t         msgType;
        uint32_t         priority;
        uint64_t         msgId;
        document::Bucket bucket;
        uint16_t         nodeIdx;

        MessageEntry(TimePoint timeStamp, uint32_t msgType, uint32_t priority,
                     uint64_t msgId, document::Bucket bucket, uint16_t nodeIdx) noexcept;
        [[nodiscard]] vespalib::string toHtml() const;
    };

    struct MessageIdKey : boost::multi_index::member<MessageEntry, uint64_t, &MessageEntry::msgId> {};

    /**
     * Each entry has a separate composite keyed index on node+bucket id+type.
     * This makes it efficient to find all messages for a node, for a bucket
     * on that node and specific message types to an exact bucket on the node.
     */
    struct CompositeNodeBucketKey
        : boost::multi_index::composite_key<
              MessageEntry,
              boost::multi_index::member<MessageEntry, uint16_t, &MessageEntry::nodeIdx>,
              boost::multi_index::member<MessageEntry, document::Bucket, &MessageEntry::bucket>,
              boost::multi_index::member<MessageEntry, uint32_t, &MessageEntry::msgType>
          >
    {
    };

    struct CompositeBucketMsgNodeKey
        : boost::multi_index::composite_key<
              MessageEntry,
              boost::multi_index::member<MessageEntry, document::Bucket, &MessageEntry::bucket>,
              boost::multi_index::member<MessageEntry, uint32_t, &MessageEntry::msgType>,
              boost::multi_index::member<MessageEntry, uint16_t, &MessageEntry::nodeIdx>
          >
    {
    };

    using Messages = boost::multi_index::multi_index_container <
        MessageEntry,
        boost::multi_index::indexed_by<
            boost::multi_index::ordered_unique<MessageIdKey>,
            boost::multi_index::ordered_non_unique<CompositeNodeBucketKey>,
            boost::multi_index::ordered_non_unique<CompositeBucketMsgNodeKey>
        >
    >;

    // Must match Messages::nth_index<N>
    static constexpr uint32_t IndexByMessageId     = 0;
    static constexpr uint32_t IndexByNodeAndBucket = 1;
    static constexpr uint32_t IndexByBucketAndType = 2;

    using DeferredBucketTaskMap   = std::unordered_multimap<
            document::Bucket,
            std::unique_ptr<DeferredTask>,
            document::Bucket::hash
        >;

    Messages                   _messages;
    framework::Component       _component;
    NodeInfo                   _nodeInfo;
    vespalib::duration         _nodeBusyDuration;
    DeferredBucketTaskMap      _deferred_read_tasks;
    mutable std::atomic<bool>  _trackTime;

    // Since distributor is currently single-threaded, this will only
    // contend when status page is being accessed. It is, however, required
    // to be present for that exact purpose.
    mutable std::mutex _lock;

    void getStatusStartPage(std::ostream& out) const;
    void getStatusPerNode(std::ostream& out) const;
    void getStatusPerBucket(std::ostream& out) const;
    TimePoint currentTime() const;

    [[nodiscard]] bool bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept;
    std::vector<std::unique_ptr<DeferredTask>> get_deferred_ops_if_bucket_writes_drained(const document::Bucket&);
};

}