1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "bucket_spaces_stats_provider.h"
#include "cluster_state_bundle_activation_listener.h"
#include "top_level_bucket_db_updater.h"
#include "distributor_component.h"
#include "distributor_host_info_reporter.h"
#include "distributor_interface.h"
#include "distributor_stripe_interface.h"
#include "externaloperationhandler.h"
#include "ideal_state_total_metrics.h"
#include "idealstatemanager.h"
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
#include "stripe_bucket_db_updater.h" // TODO this is temporary
#include "stripe_host_info_notifier.h"
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h>
#include <vespa/storage/distributor/maintenance/maintenancescheduler.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
#include <vespa/vdslib/state/random.h>
#include <chrono>
#include <queue>
#include <unordered_map>
namespace storage {
struct DoneInitializeHandler;
class HostInfo;
}
namespace storage::distributor {
class BlockingOperationStarter;
class BucketPriorityDatabase;
class TopLevelBucketDBUpdater;
class DistributorBucketSpaceRepo;
class DistributorStatus;
class DistributorStripe;
class DistributorStripePool;
class DistributorTotalMetrics;
class StripeAccessor;
class OperationSequencer;
class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
class TopLevelDistributor final
: public StorageLink,
public DistributorInterface,
public StatusDelegator,
public framework::StatusReporter,
public framework::TickingThread,
public MinReplicaProvider,
public BucketSpacesStatsProvider,
public StripeHostInfoNotifier,
public ClusterStateBundleActivationListener
{
public:
TopLevelDistributor(DistributorComponentRegister&,
const NodeIdentity& node_identity,
framework::TickingThreadPool&,
DistributorStripePool& stripe_pool,
DoneInitializeHandler&,
uint32_t num_distributor_stripes,
HostInfo& hostInfoReporterRegistrar,
ChainedMessageSender* = nullptr);
~TopLevelDistributor() override;
void onOpen() override;
void onClose() override;
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
void start_stripe_pool();
DistributorMetricSet& getMetrics();
const NodeIdentity& node_identity() const noexcept { return _node_identity; }
[[nodiscard]] bool done_initializing() const noexcept { return _done_initializing; }
// Implements DistributorInterface and DistributorMessageSender.
DistributorMetricSet& metrics() override { return getMetrics(); }
const DistributorConfiguration& config() const override;
void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override;
void sendReply(const std::shared_ptr<api::StorageReply>& reply) override;
int getDistributorIndex() const override { return _component.node_index(); }
const ClusterContext& cluster_context() const override { return _component.cluster_context(); }
void storageDistributionChanged() override;
// StatusReporter implementation
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
// Called by DistributorStripe threads when they want to notify the cluster controller of changed stats.
// Thread safe.
void notify_stripe_wants_to_send_host_info(uint16_t stripe_index) override;
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
MetricUpdateHook(TopLevelDistributor& self)
: _self(self)
{
}
void updateMetrics(const MetricLockGuard &) override {
_self.propagateInternalScanMetricsToExternal();
}
private:
TopLevelDistributor& _self;
};
private:
friend class DistributorStripeTestUtil;
friend class TopLevelDistributorTestUtil;
friend class MetricUpdateHook;
friend struct DistributorStripeTest;
friend struct TopLevelDistributorTest;
void setNodeStateUp();
/**
* Return a copy of the latest min replica data, see MinReplicaProvider.
*/
MinReplicaMap getMinReplica() const override;
PerNodeBucketSpacesStats getBucketSpacesStats() const override;
/**
* Atomically publish internal metrics to external ideal state metrics.
* Takes metric lock.
*/
void propagateInternalScanMetricsToExternal();
void enable_next_config_if_changed();
void fetch_status_requests();
void handle_status_requests();
void signal_work_was_done();
[[nodiscard]] bool work_was_done() const noexcept;
void enable_next_distribution_if_changed();
void propagate_default_distribution_thread_unsafe(std::shared_ptr<const lib::Distribution> distribution);
void un_inhibit_maintenance_if_safe_time_passed();
void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg);
void fetch_external_messages();
void process_fetched_external_messages();
void send_host_info_if_appropriate();
// Precondition: _stripe_scan_notify_mutex is held
[[nodiscard]] bool may_send_host_info_on_behalf_of_stripes(std::lock_guard<std::mutex>& held_lock) noexcept;
uint32_t random_stripe_idx();
uint32_t stripe_of_bucket_id(const document::BucketId& bucket_id, const api::StorageMessage& msg);
// ClusterStateBundleActivationListener impl:
void on_cluster_state_bundle_activated(const lib::ClusterStateBundle& new_bundle,
bool has_bucket_ownership_transfer) override;
struct StripeScanStats {
bool wants_to_send_host_info = false;
bool has_reported_in_at_least_once = false;
};
using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
const NodeIdentity _node_identity;
DistributorComponentRegister& _comp_reg;
DoneInitializeHandler& _done_init_handler;
bool _done_initializing;
std::shared_ptr<DistributorTotalMetrics> _total_metrics;
std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics;
ChainedMessageSender* _messageSender;
uint8_t _n_stripe_bits;
DistributorStripePool& _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
std::unique_ptr<StripeAccessor> _stripe_accessor;
storage::lib::RandomGen _random_stripe_gen;
std::mutex _random_stripe_gen_mutex;
MessageQueue _message_queue; // Queue for top-level ops
MessageQueue _fetched_messages;
distributor::DistributorComponent _component;
storage::DistributorComponent _ideal_state_component;
std::shared_ptr<const DistributorConfiguration> _total_config;
std::unique_ptr<TopLevelBucketDBUpdater> _bucket_db_updater;
StatusReporterDelegate _distributorStatusDelegate;
std::unique_ptr<StatusReporterDelegate> _bucket_db_status_delegate;
framework::TickingThreadPool& _threadPool;
mutable std::vector<std::shared_ptr<DistributorStatus>> _status_to_do;
mutable std::vector<std::shared_ptr<DistributorStatus>> _fetched_status_requests;
mutable std::mutex _stripe_scan_notify_mutex;
std::vector<StripeScanStats> _stripe_scan_stats; // Indices are 1-1 with _stripes entries
vespalib::steady_time _last_host_info_send_time;
vespalib::duration _host_info_send_delay;
// Ideally this would use steady_clock, but for now let's use the same semantics as
// feed blocking during safe time periods.
vespalib::system_time _maintenance_safe_time_point;
std::chrono::seconds _maintenance_safe_time_delay;
framework::ThreadWaitInfo _tickResult;
MetricUpdateHook _metricUpdateHook;
DistributorHostInfoReporter _hostInfoReporter;
mutable std::mutex _distribution_mutex;
std::shared_ptr<lib::Distribution> _distribution;
std::shared_ptr<lib::Distribution> _next_distribution;
uint64_t _current_internal_config_generation;
};
}
|