From b953eaa236aabf8bc883a41fb5345272200dccf4 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 23 Mar 2021 12:40:34 +0000 Subject: Simplify Distributor class to remove now unused functionality In particular, remove `DistributorStripeInterface` inheritance and subclassed `DistributorComponent` usage. --- .../src/vespa/storage/distributor/distributor.cpp | 133 +-------------------- .../src/vespa/storage/distributor/distributor.h | 132 ++++---------------- .../storage/distributor/distributor_stripe.cpp | 1 - .../vespa/storage/distributor/distributor_stripe.h | 17 --- 4 files changed, 25 insertions(+), 258 deletions(-) (limited to 'storage') diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 29af083cf4e..f886640d1f9 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -46,22 +46,16 @@ Distributor::Distributor(DistributorComponentRegister& compReg, HostInfo& hostInfoReporterRegistrar, ChainedMessageSender* messageSender) : StorageLink("distributor"), - DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), _metrics(std::make_shared()), _messageSender(messageSender), _stripe(std::make_unique(compReg, *_metrics, node_identity, threadPool, doneInitHandler, manageActiveBucketCopies, *this)), - // TODO STRIPE remove once DistributorStripeComponent no longer references bucket space repos - _bucketSpaceRepo(std::make_unique(node_identity.node_index())), - _readOnlyBucketSpaceRepo(std::make_unique(node_identity.node_index())), - // TODO STRIPE slim down - _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"), + _component(compReg, "distributor"), _distributorStatusDelegate(compReg, *this, *this), _threadPool(threadPool), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), _metricUpdateHook(*this), - _metricLock(), _hostInfoReporter(*this, *this) { _component.registerMetric(*_metrics); @@ -82,11 +76,6 @@ Distributor::isInRecoveryMode() const noexcept { return _stripe->isInRecoveryMode(); } -int -Distributor::getDistributorIndex() const { - return _component.getIndex(); -} - const PendingMessageTracker& Distributor::getPendingMessageTracker() const { return _stripe->getPendingMessageTracker(); @@ -169,27 +158,6 @@ Distributor::db_memory_sample_interval() const noexcept { return _stripe->db_memory_sample_interval(); } -bool Distributor::initializing() const { - return _stripe->initializing(); -} - -const lib::ClusterState* -Distributor::pendingClusterStateOrNull(const document::BucketSpace& space) const { - return bucket_db_updater().pendingClusterStateOrNull(space); -} - -void -Distributor::sendCommand(const std::shared_ptr&) -{ - assert(false); // TODO STRIPE -} - -void -Distributor::sendReply(const std::shared_ptr&) -{ - assert(false); // TODO STRIPE -} - void Distributor::setNodeStateUp() { @@ -217,22 +185,11 @@ Distributor::onOpen() } } -void Distributor::send_shutdown_abort_reply(const std::shared_ptr& msg) { - api::StorageReply::UP reply( - std::dynamic_pointer_cast(msg)->makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down")); - sendUp(std::shared_ptr(reply.release())); -} - void Distributor::onClose() { LOG(debug, "Distributor::onClose invoked"); _stripe->close(); } -void Distributor::send_up_without_tracking(const std::shared_ptr&) { - assert(false); -} - void Distributor::sendUp(const std::shared_ptr& msg) { @@ -259,19 +216,6 @@ Distributor::onDown(const std::shared_ptr& msg) return _stripe->onDown(msg); } -void -Distributor::handleCompletedMerge( - const std::shared_ptr&) -{ - assert(false); -} - -bool -Distributor::isMaintenanceReply(const api::StorageReply&) const -{ - assert(false); -} - bool Distributor::handleReply(const std::shared_ptr& reply) { @@ -298,16 +242,6 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) _stripe->enableClusterStateBundle(state); } -OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket&) const { - abort(); -} - -void -Distributor::notifyDistributionChangeEnabled() -{ - _stripe->notifyDistributionChangeEnabled(); -} - void Distributor::storageDistributionChanged() { @@ -315,43 +249,6 @@ Distributor::storageDistributionChanged() _stripe->storageDistributionChanged(); } -void -Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) { - bucket_db_updater().recheckBucketInfo(nodeIdx, bucket); -} - -namespace { - -class SplitChecker : public PendingMessageTracker::Checker -{ -public: - bool found; - uint8_t maxPri; - - SplitChecker(uint8_t maxP) : found(false), maxPri(maxP) {}; - - bool check(uint32_t msgType, uint16_t node, uint8_t pri) override { - (void) node; - (void) pri; - if (msgType == api::MessageType::SPLITBUCKET_ID && pri <= maxPri) { - found = true; - return false; - } - - return true; - } -}; - -} - -void -Distributor::checkBucketForSplit(document::BucketSpace, - const BucketDatabase::Entry&, - uint8_t) -{ - assert(false); -} - void Distributor::enableNextDistribution() { @@ -366,24 +263,6 @@ Distributor::propagateDefaultDistribution( _stripe->propagateDefaultDistribution(std::move(distribution)); } -void -Distributor::propagateClusterStates() -{ - assert(false); -} - -void -Distributor::signalWorkWasDone() -{ - _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED; -} - -bool -Distributor::workWasDone() const noexcept -{ - return !_tickResult.waitWanted(); -} - std::unordered_map Distributor::getMinReplica() const { @@ -410,10 +289,6 @@ Distributor::propagateInternalScanMetricsToExternal() _stripe->propagateInternalScanMetricsToExternal(); } -void Distributor::maybe_update_bucket_db_memory_usage_stats() { - assert(false); -} - void Distributor::scanAllBuckets() { @@ -447,12 +322,6 @@ Distributor::enableNextConfig() _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call } -void -Distributor::handleStatusRequests() -{ - assert(false); -} - vespalib::string Distributor::getReportContentType(const framework::HttpUrlPath& path) const { diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 261343b83fd..bfffe126b44 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -12,10 +12,10 @@ #include "pendingmessagetracker.h" #include "statusreporterdelegate.h" #include +#include #include #include #include -#include #include #include #include @@ -41,14 +41,13 @@ class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; class ThrottlingOperationStarter; -class Distributor : public StorageLink, - public DistributorStripeInterface, - public StatusDelegator, - public framework::StatusReporter, - public framework::TickingThread, - public MinReplicaProvider, - public BucketSpacesStatsProvider, - public NonTrackingMessageSender +class Distributor final + : public StorageLink, + public StatusDelegator, + public framework::StatusReporter, + public framework::TickingThread, + public MinReplicaProvider, + public BucketSpacesStatsProvider { public: Distributor(DistributorComponentRegister&, @@ -61,7 +60,7 @@ public: ~Distributor() override; - const ClusterContext& cluster_context() const override { + const ClusterContext& cluster_context() const { return _component.cluster_context(); } void onOpen() override; @@ -69,40 +68,18 @@ public: bool onDown(const std::shared_ptr&) override; void sendUp(const std::shared_ptr&) override; void sendDown(const std::shared_ptr&) override; - // Bypasses message tracker component. Thread safe. - void send_up_without_tracking(const std::shared_ptr&) override; - ChainedMessageSender& getMessageSender() override { - abort(); // TODO STRIPE - } - - DistributorMetricSet& getMetrics() override { return *_metrics; } - - const OperationSequencer& operation_sequencer() const noexcept override { - abort(); // TODO STRIPE - } - - const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override; + DistributorMetricSet& getMetrics() { return *_metrics; } /** * Enables a new cluster state. Called after the bucket db updater has * retrieved all bucket info related to the change. */ - void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override; - - /** - * Invoked when a pending cluster state for a distribution (config) - * change has been enabled. An invocation of storageDistributionChanged - * will eventually cause this method to be called, assuming the pending - * cluster state completed successfully. - */ - void notifyDistributionChangeEnabled() override; + void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle); void storageDistributionChanged() override; - void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override; - - bool handleReply(const std::shared_ptr& reply) override; + bool handleReply(const std::shared_ptr& reply); // StatusReporter implementation vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; @@ -115,40 +92,13 @@ public: virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; - /** - * Checks whether a bucket needs to be split, and sends a split - * if so. - */ - void checkBucketForSplit(document::BucketSpace bucketSpace, - const BucketDatabase::Entry& e, - uint8_t priority) override; - - const lib::ClusterStateBundle& getClusterStateBundle() const override; - - /** - * @return Returns the states in which the distributors consider - * storage nodes to be up. - */ - const char* getStorageNodeUpStates() const override { - return "uri"; - } - - /** - * Called by bucket db updater after a merge has finished, and all the - * request bucket info operations have been performed as well. Passes the - * merge back to the operation that created it. - */ - void handleCompletedMerge(const std::shared_ptr& reply) override; - - bool initializing() const override; - - const DistributorConfiguration& getConfig() const override; + const lib::ClusterStateBundle& getClusterStateBundle() const; + const DistributorConfiguration& getConfig() const; bool isInRecoveryMode() const noexcept; - int getDistributorIndex() const override; - PendingMessageTracker& getPendingMessageTracker() override; - const PendingMessageTracker& getPendingMessageTracker() const override; + PendingMessageTracker& getPendingMessageTracker(); + const PendingMessageTracker& getPendingMessageTracker() const; DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; @@ -157,16 +107,6 @@ public: storage::distributor::DistributorStripeComponent& distributor_component() noexcept; - void sendCommand(const std::shared_ptr&) override; - void sendReply(const std::shared_ptr&) override; - - const BucketGcTimeCalculator::BucketIdHasher& - getBucketIdHasher() const override { - abort(); // TODO STRIPE - } - - OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override; - class MetricUpdateHook : public framework::MetricUpdateHook { public: @@ -193,12 +133,6 @@ private: void setNodeStateUp(); bool handleMessage(const std::shared_ptr& msg); - bool isMaintenanceReply(const api::StorageReply& reply) const; - - void handleStatusRequests(); - void send_shutdown_abort_reply(const std::shared_ptr&); - void handle_or_propagate_message(const std::shared_ptr& msg); - void startExternalOperations(); // Accessors used by tests BucketDBUpdater& bucket_db_updater(); @@ -223,39 +157,21 @@ private: * Takes metric lock. */ void propagateInternalScanMetricsToExternal(); - void maybe_update_bucket_db_memory_usage_stats(); void scanAllBuckets(); void enableNextConfig(); - void signalWorkWasDone(); - bool workWasDone() const noexcept; - void enableNextDistribution(); void propagateDefaultDistribution(std::shared_ptr); - void propagateClusterStates(); std::shared_ptr _metrics; - ChainedMessageSender* _messageSender; + ChainedMessageSender* _messageSender; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. - std::unique_ptr _stripe; - - std::unique_ptr _bucketSpaceRepo; - // Read-only bucket space repo with DBs that only contain buckets transiently - // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo - // and the DBs are empty during non-transition phases. - std::unique_ptr _readOnlyBucketSpaceRepo; - storage::distributor::DistributorStripeComponent _component; - - StatusReporterDelegate _distributorStatusDelegate; - - framework::TickingThreadPool& _threadPool; - - mutable std::vector> _statusToDo; - mutable std::vector> _fetchedStatusRequests; - - framework::ThreadWaitInfo _tickResult; - MetricUpdateHook _metricUpdateHook; - mutable std::mutex _metricLock; - DistributorHostInfoReporter _hostInfoReporter; + std::unique_ptr _stripe; + storage::DistributorComponent _component; + StatusReporterDelegate _distributorStatusDelegate; + framework::TickingThreadPool& _threadPool; + framework::ThreadWaitInfo _tickResult; + MetricUpdateHook _metricUpdateHook; + DistributorHostInfoReporter _hostInfoReporter; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 0a3befbbf5c..e41c7940a0d 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -73,7 +73,6 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _recoveryTimeStarted(_component.getClock()), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), _bucketIdHasher(std::make_unique()), - _metricUpdateHook(*this), _metricLock(), _maintenanceStats(), _bucketSpacesStats(), diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index c6ada1f1ad7..10b3f54d834 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -188,22 +188,6 @@ public: OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override; - class MetricUpdateHook : public framework::MetricUpdateHook - { - public: - MetricUpdateHook(DistributorStripe& self) - : _self(self) - { - } - - void updateMetrics(const MetricLockGuard &) override { - _self.propagateInternalScanMetricsToExternal(); - } - - private: - DistributorStripe& _self; - }; - std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept { return _db_memory_sample_interval; } @@ -335,7 +319,6 @@ private: framework::ThreadWaitInfo _tickResult; BucketDBMetricUpdater _bucketDBMetricUpdater; std::unique_ptr _bucketIdHasher; - MetricUpdateHook _metricUpdateHook; mutable std::mutex _metricLock; /** * Maintenance stats for last completed database scan iteration. -- cgit v1.2.3