aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/mergethrottler.h
blob: 997477a4b70fdb548dbfdc7dcda06041e8f818ca (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class storage::MergeThrottler
 * @ingroup storageserver
 *
 * @brief Throttler and forwarder of merge commands
 */
#pragma once

#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/distributor/messageguard.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/summetric.h>
#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/valuemetric.h>
#include <vespa/metrics/metrictimer.h>
#include <vespa/config/config.h>
#include <chrono>

namespace storage {

class AbortBucketOperationsCommand;

class MergeThrottler : public framework::Runnable,
                       public StorageLink,
                       public framework::HtmlStatusReporter,
                       private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>
{
public:
    class MergeFailureMetrics : public metrics::MetricSet {
    public:
        metrics::SumMetric<metrics::LongCountMetric> sum;
        metrics::LongCountMetric notready;
        metrics::LongCountMetric timeout;
        metrics::LongCountMetric aborted;
        metrics::LongCountMetric wrongdistribution;
        metrics::LongCountMetric bucketnotfound;
        metrics::LongCountMetric busy;
        metrics::LongCountMetric exists;
        metrics::LongCountMetric rejected;
        metrics::LongCountMetric other;

        MergeFailureMetrics(metrics::MetricSet* owner);
        ~MergeFailureMetrics() override;
    };

    class MergeOperationMetrics : public metrics::MetricSet {
    public:
        metrics::LongCountMetric ok;
        MergeFailureMetrics failures;

        MergeOperationMetrics(const std::string& name, metrics::MetricSet* owner);
        ~MergeOperationMetrics() override;
    };

    class Metrics : public metrics::MetricSet {
    public:
        metrics::DoubleAverageMetric averageQueueWaitingTime;
        metrics::LongValueMetric queueSize;
        metrics::LongValueMetric active_window_size;
        metrics::LongCountMetric bounced_due_to_back_pressure;
        MergeOperationMetrics chaining;
        MergeOperationMetrics local;

        Metrics(metrics::MetricSet* owner = 0);
        ~Metrics();
    };

private:
    // TODO: make PQ with stable ordering into own, generic class
    template <class MessageType>
    struct StablePriorityOrderingWrapper {
        MessageType _msg;
        metrics::MetricTimer _startTimer;
        uint64_t _sequence;

        StablePriorityOrderingWrapper(const MessageType& msg, uint64_t sequence)
            : _msg(msg), _startTimer(), _sequence(sequence)
        {
        }

        bool operator==(const StablePriorityOrderingWrapper& other) const {
            return (*_msg == *other._msg
                    && _sequence == other._sequence);
        }

        bool operator<(const StablePriorityOrderingWrapper& other) const {
            if (_msg->getPriority() < other._msg->getPriority()) {
                return true;
            }
            return (_sequence < other._sequence);
        }
    };

    struct ChainedMergeState {
        api::StorageMessage::SP _cmd;
        std::string _cmdString; // For being able to print message even when we don't own it
        uint64_t _clusterStateVersion;
        bool _inCycle;
        bool _executingLocally;
        bool _unwinding;
        bool _cycleBroken;
        bool _aborted;

        ChainedMergeState();
        ChainedMergeState(const api::StorageMessage::SP& cmd, bool executing = false);
        ~ChainedMergeState();
        // Use default copy-constructor/assignment operator

        bool isExecutingLocally() const { return _executingLocally; }
        void setExecutingLocally(bool execLocally) { _executingLocally = execLocally; }

        const api::StorageMessage::SP& getMergeCmd() const { return _cmd; }
        void setMergeCmd(const api::StorageMessage::SP& cmd) {
            _cmd = cmd;
            if (cmd.get()) {
                _cmdString = cmd->toString();
            }
        }

        bool isInCycle() const { return _inCycle; }
        void setInCycle(bool inCycle) { _inCycle = inCycle; }

        bool isUnwinding() const { return _unwinding; }
        void setUnwinding(bool unwinding) { _unwinding = unwinding; }

        bool isCycleBroken() const { return _cycleBroken; }
        void setCycleBroken(bool cycleBroken) { _cycleBroken = cycleBroken; }

        bool isAborted() const { return _aborted; }
        void setAborted(bool aborted) { _aborted = aborted; }

        const std::string& getMergeCmdString() const { return _cmdString; }
    };

    typedef std::map<document::Bucket, ChainedMergeState> ActiveMergeMap;

    // Use a set rather than a priority_queue, since we want to be
    // able to iterate over the collection during status rendering
    typedef std::set<
        StablePriorityOrderingWrapper<api::StorageMessage::SP>
    > MergePriorityQueue;

    enum RendezvousState {
        RENDEZVOUS_NONE,
        RENDEZVOUS_REQUESTED,
        RENDEZVOUS_ESTABLISHED,
        RENDEZVOUS_RELEASED
    };

    ActiveMergeMap _merges;
    MergePriorityQueue _queue;
    std::size_t _maxQueueSize;
    mbus::StaticThrottlePolicy::UP _throttlePolicy;
    uint64_t _queueSequence; // TODO: move into a stable priority queue class
    mutable std::mutex _messageLock;
    std::condition_variable _messageCond;
    mutable std::mutex _stateLock;
    config::ConfigFetcher _configFetcher;
    // Messages pending to be processed by the worker thread
    std::vector<api::StorageMessage::SP> _messagesDown;
    std::vector<api::StorageMessage::SP> _messagesUp;
    std::unique_ptr<Metrics> _metrics;
    StorageComponent _component;
    framework::Thread::UP _thread;
    RendezvousState _rendezvous;
    mutable std::chrono::steady_clock::time_point _throttle_until_time;
    std::chrono::steady_clock::duration _backpressure_duration;
    bool _disable_queue_limits_for_chained_merges;
    bool _closing;
public:
    /**
     * windowSizeIncrement used for allowing unit tests to start out with more
     * than 1 as their window size.
     */
    MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&);
    ~MergeThrottler() override;

    /** Implements document::Runnable::run */
    void run(framework::ThreadHandle&) override;

    void onOpen() override;
    void onClose() override;
    void onFlush(bool downwards) override;
    bool onUp(const std::shared_ptr<api::StorageMessage>& msg) override;
    bool onDown(const std::shared_ptr<api::StorageMessage>& msg) override;

    bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override;

    /*
     * When invoked, merges to the node will be BUSY-bounced by the throttler
     * for a configurable period of time instead of being processed.
     *
     * Thread safe, but must not be called if _stateLock is already held, or
     * deadlock will occur.
     */
    void apply_timed_backpressure();
    bool backpressure_mode_active() const;

    // For unit testing only
    const ActiveMergeMap& getActiveMerges() const { return _merges; }
    // For unit testing only
    const MergePriorityQueue& getMergeQueue() const { return _queue; }
    // For unit testing only
    const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
    mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
    void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
    // For unit testing only
    std::mutex& getStateLock() { return _stateLock; }

    Metrics& getMetrics() { return *_metrics; }
    std::size_t getMaxQueueSize() const { return _maxQueueSize; }
    void print(std::ostream& out, bool verbose, const std::string& indent) const override;
    void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
private:
    friend class ThreadRendezvousGuard; // impl in .cpp file

    // Simple helper class for centralizing chaining logic
    struct MergeNodeSequence {
        const api::MergeBucketCommand& _cmd;
        std::vector<api::MergeBucketCommand::Node> _sortedNodes;
        std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence
        const uint16_t _thisIndex; // Index of the current storage node

        MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex);

        std::size_t getSortedIndex() const { return _sortedIndex; }
        const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const {
            return _sortedNodes;
        }
        bool isIndexUnknown() const {
            return (_sortedIndex == std::numeric_limits<std::size_t>::max());
        }
        /**
         * This node is the merge executor if it's the first element in the
         * _unsorted_ node sequence.
         */
        bool isMergeExecutor() const {
            return (_cmd.getNodes()[0].index == _thisIndex);
        }
        uint16_t getExecutorNodeIndex() const{
            return _cmd.getNodes()[0].index;
        }
        bool isLastNode() const {
            return (_sortedIndex == _sortedNodes.size() - 1);
        }
        bool chainContainsIndex(uint16_t idx) const;
        uint16_t getThisNodeIndex() const { return _thisIndex; }
        /**
         * Gets node to forward to in strictly increasing order.
         */
        uint16_t getNextNodeInChain() const;

        /**
         * Returns true iff the chain vector (which is implicitly sorted)
         * pairwise compares equally to the vector of sorted node indices
         */
        bool isChainCompleted() const;
    };

    /**
     * Callback method for config system (IFetcherCallback)
     */
    void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) override;

    // NOTE: unless explicitly specified, all the below functions require
    // _sync lock to be held upon call (usually implicitly via MessageGuard)

    void handleMessageDown(const std::shared_ptr<api::StorageMessage>& msg, MessageGuard& msgGuard);
    void handleMessageUp(const std::shared_ptr<api::StorageMessage>& msg, MessageGuard& msgGuard);

    /**
     * Handle the receival of MergeBucketReply, be it from another node
     * or from the persistence layer on the current node itself. In the
     * case of the former, fromPersistenceLayer must be false, since we have
     * to generate a new reply to pass back to the unwind chain. In
     * case of the latter, fromPersistenceLayer must be true since the
     * reply from the persistence layer will be automatically sent
     * back in the chain.
     */
    void processMergeReply(
            const std::shared_ptr<api::StorageMessage>& msg,
            bool fromPersistenceLayer,
            MessageGuard& msgGuard);

    /**
     * Validate that the merge command is consistent with our current
     * state.
     * @return true if message is valid and may be further processed.
     * If false is returned, a rejection reply will have been sent up
     * on the message guard.
     */
    bool validateNewMerge(
            const api::MergeBucketCommand& mergeCmd,
            const MergeNodeSequence& nodeSeq,
            MessageGuard& msgGuard) const;
    /**
     * Register a new merge bucket command with the internal state and
     * either forward or execute it, depending on where the current node
     * is located in the merge chain.
     *
     * Precondition: no existing merge state exists for msg's bucketid.
     */
    void processNewMergeCommand(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);

    /**
     * Precondition: an existing merge state exists for msg's bucketid.
     * @return true if message was handled, false otherwise (see onUp/onDown).
     */
    bool processCycledMergeCommand(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);

    /**
     * Forwards the given MergeBucketCommand to the storage node given
     * by nodeIndex. New forwarded message will inherit mergeCmd's priority.
     * The current node's index will be added to the end of the merge
     * chain vector.
     */
    void forwardCommandToNode(
            const api::MergeBucketCommand& mergeCmd,
            uint16_t nodeIndex,
            MessageGuard& msgGuard);

    void removeActiveMerge(ActiveMergeMap::iterator);

    /**
     * Gets (and pops) the highest priority merge waiting in the queue,
     * if one exists.
     * @return Highest priority waiting merge or null SP if queue is empty
     */
    api::StorageMessage::SP getNextQueuedMerge();
    void enqueueMerge(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);

    /**
     * @return true if throttle policy says at least one additional
     * merge can be processed.
     */
    bool canProcessNewMerge() const;

    bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
    void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
    bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
    bool backpressure_mode_active_no_lock() const;
    void backpressure_bounce_all_queued_merges(MessageGuard& guard);
    bool allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept;

    void sendReply(const api::MergeBucketCommand& cmd,
                   const api::ReturnCode& result,
                   MessageGuard& msgGuard,
                   MergeOperationMetrics& metrics) const;

    /**
     * @return true if a merge for msg's bucketid is already registered
     * in the internal merge throttler state.
     */
    bool isMergeAlreadyKnown(const api::StorageMessage::SP& msg) const;

    bool rejectMergeIfOutdated(
            const api::StorageMessage::SP& msg,
            uint32_t rejectLessThanVersion,
            MessageGuard& msgGuard) const;

    /**
     * Immediately reject all queued merges whose cluster state version is
     * less than that of rejectLessThanVersion
     */
    void rejectOutdatedQueuedMerges(MessageGuard& msgGuard, uint32_t rejectLessThanVersion);
    bool attemptProcessNextQueuedMerge(MessageGuard& msgGuard);
    bool processQueuedMerges(MessageGuard& msgGuard);
    void handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond);
    void rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond);
    void releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond);
    bool isDiffCommand(const api::StorageMessage& msg) const;
    bool isMergeCommand(const api::StorageMessage& msg) const;
    bool isMergeReply(const api::StorageMessage& msg) const;
    bool bucketIsUnknownOrAborted(const document::Bucket& bucket) const;

    std::shared_ptr<api::StorageMessage> makeAbortReply(
            api::StorageCommand& cmd,
            vespalib::stringref reason) const;

    void handleOutdatedMerges(const api::SetSystemStateCommand&);
    void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
    void markActiveMergesAsAborted(uint32_t minimumStateVersion);

    void update_active_merge_window_size_metric() noexcept;

    // const function, but metrics are mutable
    void updateOperationMetrics(
            const api::ReturnCode& result,
            MergeOperationMetrics& metrics) const;
};

} // namespace storage