summaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/distributor.h
blob: 84e195fdff2312b5be18a44a5f17a26e50a10450 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "bucket_spaces_stats_provider.h"
#include "bucketdbupdater.h"
#include "distributor_host_info_reporter.h"
#include "distributorinterface.h"
#include "externaloperationhandler.h"
#include "idealstatemanager.h"
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h>
#include <vespa/storage/distributor/maintenance/maintenancescheduler.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
#include <vespa/vespalib/util/sync.h>
#include <queue>
#include <unordered_map>

namespace storage {

struct DoneInitializeHandler;
class HostInfo;

namespace distributor {

class DistributorBucketSpaceRepo;
class SimpleMaintenanceScanner;
class BlockingOperationStarter;
class ThrottlingOperationStarter;
class BucketPriorityDatabase;
class OwnershipTransferSafeTimePointCalculator;

class Distributor : public StorageLink,
                    public DistributorInterface,
                    public StatusDelegator,
                    public framework::StatusReporter,
                    public framework::TickingThread,
                    public MinReplicaProvider,
                    public BucketSpacesStatsProvider
{
public:
    Distributor(DistributorComponentRegister&,
                framework::TickingThreadPool&,
                DoneInitializeHandler&,
                bool manageActiveBucketCopies,
                HostInfo& hostInfoReporterRegistrar,
                ChainedMessageSender* = nullptr);

    ~Distributor() override;

    void onOpen() override;
    void onClose() override;
    bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
    void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
    void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
    // Bypasses message tracker component. Thread safe.
    void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&);

    ChainedMessageSender& getMessageSender() override {
        return (_messageSender == 0 ? *this : *_messageSender);
    }

    DistributorMetricSet& getMetrics() override { return *_metrics; }
    
    PendingMessageTracker& getPendingMessageTracker() override {
        return _pendingMessageTracker;
    }

    BucketOwnership checkOwnershipInPendingState(const document::Bucket &bucket) const override;

    const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;

    /**
     * Enables a new cluster state. Called after the bucket db updater has
     * retrieved all bucket info related to the change.
     */
    void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override;

    /**
     * Invoked when a pending cluster state for a distribution (config)
     * change has been enabled. An invocation of storageDistributionChanged
     * will eventually cause this method to be called, assuming the pending
     * cluster state completed successfully.
     */
    void notifyDistributionChangeEnabled() override;

    void storageDistributionChanged() override;

    void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override;

    bool handleReply(const std::shared_ptr<api::StorageReply>& reply) override;

    // StatusReporter implementation
    vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
    bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;

    bool handleStatusRequest(const DelegatedStatusRequest& request) const override;

    uint32_t pendingMaintenanceCount() const;

    std::string getActiveIdealStateOperations() const;
    std::string getActiveOperations() const;

    virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
    virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;

    /**
     * Checks whether a bucket needs to be split, and sends a split
     * if so.
     */
    void checkBucketForSplit(document::BucketSpace bucketSpace,
                             const BucketDatabase::Entry& e,
                             uint8_t priority) override;

    const lib::ClusterStateBundle& getClusterStateBundle() const override;

    /**
     * @return Returns the states in which the distributors consider
     * storage nodes to be up.
     */
    const char* getStorageNodeUpStates() const override {
        return _initializingIsUp ? "uri" : "ur";
    }

    /**
     * Called by bucket db updater after a merge has finished, and all the
     * request bucket info operations have been performed as well. Passes the
     * merge back to the operation that created it.
     */
    void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override;


    bool initializing() const override {
        return !_doneInitializing;
    }
    
    const DistributorConfiguration& getConfig() const override {
        return _component.getTotalDistributorConfig();
    }

    bool isInRecoveryMode() const {
        return _schedulingMode == MaintenanceScheduler::RECOVERY_SCHEDULING_MODE;
    }

    int getDistributorIndex() const override;
    const std::string& getClusterName() const override;
    const PendingMessageTracker& getPendingMessageTracker() const override;
    void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
    void sendReply(const std::shared_ptr<api::StorageReply>&) override;

    const BucketGcTimeCalculator::BucketIdHasher&
    getBucketIdHasher() const override {
        return *_bucketIdHasher;
    }

    DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; }
    const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; }

    DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept {
        return *_readOnlyBucketSpaceRepo;
    }
    const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept {
        return *_readOnlyBucketSpaceRepo;
    }

    OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;

    class Status;
    class MetricUpdateHook : public framework::MetricUpdateHook
    {
    public:
        MetricUpdateHook(Distributor& self)
            : _self(self)
        {
        }

        void updateMetrics(const MetricLockGuard &) override {
            _self.propagateInternalScanMetricsToExternal();
        }

    private:
        Distributor& _self;
    };

    std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept {
        return _db_memory_sample_interval;
    }

private:
    friend struct DistributorTest;
    friend class BucketDBUpdaterTest;
    friend class DistributorTestUtil;
    friend class MetricUpdateHook;

    void setNodeStateUp();
    bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
    bool isMaintenanceReply(const api::StorageReply& reply) const;

    void handleStatusRequests();
    void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&);
    void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
    void startExternalOperations();

    /**
     * Return a copy of the latest min replica data, see MinReplicaProvider.
     */
    std::unordered_map<uint16_t, uint32_t> getMinReplica() const override;

    PerNodeBucketSpacesStats getBucketSpacesStats() const override;

    /**
     * Atomically publish internal metrics to external ideal state metrics.
     * Takes metric lock.
     */
    void propagateInternalScanMetricsToExternal();
    /**
     * Atomically updates internal metrics (not externally visible metrics;
     * these are not changed until a snapshot triggers
     * propagateIdealStateMetrics()).
     *
     * Takes metric lock.
     */
    void updateInternalMetricsForCompletedScan();
    void maybe_update_bucket_db_memory_usage_stats();
    void scanAllBuckets();
    MaintenanceScanner::ScanResult scanNextBucket();
    bool should_inhibit_current_maintenance_scan_tick() const noexcept;
    void mark_current_maintenance_tick_as_inhibited() noexcept;
    void mark_maintenance_tick_as_no_longer_inhibited() noexcept;
    void enableNextConfig();
    void fetchStatusRequests();
    void fetchExternalMessages();
    void startNextMaintenanceOperation();
    void signalWorkWasDone();
    bool workWasDone() const noexcept;

    void enterRecoveryMode();
    void leaveRecoveryMode();

    // Tries to generate an operation from the given message. Returns true
    // if we either returned an operation, or the message was otherwise handled
    // (for instance, wrong distribution).
    bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg,
                           Operation::SP& operation);

    void enableNextDistribution();
    void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
    void propagateClusterStates();

    BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
    template <typename NodeFunctor>
    void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&);
    void invalidate_bucket_spaces_stats();
    void send_updated_host_info_if_required();

    lib::ClusterStateBundle _clusterStateBundle;

    DistributorComponentRegister& _compReg;
    storage::DistributorComponent _component;
    std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
    // Read-only bucket space repo with DBs that only contain buckets transiently
    // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo
    // and the DBs are empty during non-transition phases.
    std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo;
    std::shared_ptr<DistributorMetricSet> _metrics;

    OperationOwner _operationOwner;
    OperationOwner _maintenanceOperationOwner;

    PendingMessageTracker _pendingMessageTracker;
    BucketDBUpdater _bucketDBUpdater;
    StatusReporterDelegate _distributorStatusDelegate;
    StatusReporterDelegate _bucketDBStatusDelegate;
    IdealStateManager _idealStateManager;
    ExternalOperationHandler _externalOperationHandler;

    std::shared_ptr<lib::Distribution> _distribution;
    std::shared_ptr<lib::Distribution> _nextDistribution;

    using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
    struct IndirectHigherPriority {
        template <typename Lhs, typename Rhs>
        bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept {
            return lhs->getPriority() > rhs->getPriority();
        }
    };
    using ClientRequestPriorityQueue = std::priority_queue<
            std::shared_ptr<api::StorageMessage>,
            std::vector<std::shared_ptr<api::StorageMessage>>,
            IndirectHigherPriority
    >;
    MessageQueue _messageQueue;
    ClientRequestPriorityQueue _client_request_priority_queue;
    MessageQueue _fetchedMessages;
    framework::TickingThreadPool& _threadPool;
    vespalib::Monitor _statusMonitor;

    mutable std::vector<std::shared_ptr<Status>> _statusToDo;
    mutable std::vector<std::shared_ptr<Status>> _fetchedStatusRequests;

    bool _initializingIsUp;

    DoneInitializeHandler& _doneInitializeHandler;
    bool _doneInitializing;

    ChainedMessageSender* _messageSender;

    std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb;
    std::unique_ptr<SimpleMaintenanceScanner> _scanner;
    std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter;
    std::unique_ptr<BlockingOperationStarter> _blockingStarter;
    std::unique_ptr<MaintenanceScheduler> _scheduler;
    MaintenanceScheduler::SchedulingMode _schedulingMode;
    framework::MilliSecTimer _recoveryTimeStarted;
    framework::ThreadWaitInfo _tickResult;
    const std::string _clusterName;
    BucketDBMetricUpdater _bucketDBMetricUpdater;
    std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher;
    MetricUpdateHook _metricUpdateHook;
    vespalib::Lock _metricLock;
    /**
     * Maintenance stats for last completed database scan iteration.
     * Access must be protected by _metricLock as it is read by metric
     * manager thread but written by distributor thread.
     */
    SimpleMaintenanceScanner::PendingMaintenanceStats _maintenanceStats;
    BucketSpacesStatsProvider::PerNodeBucketSpacesStats _bucketSpacesStats;
    BucketDBMetricUpdater::Stats _bucketDbStats;
    DistributorHostInfoReporter _hostInfoReporter;
    std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
    std::chrono::steady_clock::duration _db_memory_sample_interval;
    std::chrono::steady_clock::time_point _last_db_memory_sample_time_point;
    size_t _inhibited_maintenance_tick_count;
    bool _must_send_updated_host_info;
};

} // distributor
} // storage