diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-05-05 15:18:42 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-05-05 15:18:42 +0000 |
commit | 4c9730f04e9ad5172fbe2bfe901d406ce91be3a0 (patch) | |
tree | 317292979ea820e815b8100dadfa2d0f0db9b025 /storage | |
parent | 27d771c22b7eb6a18d6c483ef7df9d422dc79c95 (diff) |
Make status reporting from distributor and bucket db updater work when running in new stripe mode.
Diffstat (limited to 'storage')
12 files changed, 197 insertions, 44 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index ee36259bdd3..d903a818270 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -33,7 +33,7 @@ BucketDBUpdater::BucketDBUpdater(const DistributorNodeContext& node_ctx, ChainedMessageSender& chained_sender, std::shared_ptr<const lib::Distribution> bootstrap_distribution, StripeAccessor& stripe_accessor) - : framework::StatusReporter("temp_bucketdb", "Bucket DB Updater"), // TODO STRIPE rename once duplication is removed + : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _stripe_accessor(stripe_accessor), _active_state_bundle(lib::ClusterState()), _node_ctx(node_ctx), @@ -483,6 +483,13 @@ BucketDBUpdater::report_xml_status(vespalib::xml::XmlOutputStream& xos, << XmlAttribute("processingtime", i->_processingTime) << XmlEndTag(); } + xos << XmlEndTag() + << XmlTag("single_bucket_requests"); + auto guard = _stripe_accessor.rendezvous_and_hold_all(); + guard->report_single_bucket_requests(xos); + xos << XmlEndTag() + << XmlTag("delayed_single_bucket_requests"); + guard->report_delayed_single_bucket_requests(xos); xos << XmlEndTag() << XmlEndTag(); return ""; } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index cd42b0ecc1b..e5d25681148 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -87,9 +87,11 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.getDistribution(), *_stripe_accessor); _stripes.emplace_back(std::move(_stripe)); + _distributorStatusDelegate.registerStatusPage(); + _bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater); + _bucket_db_status_delegate->registerStatusPage(); } _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting()); - _distributorStatusDelegate.registerStatusPage(); hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter); propagateDefaultDistribution(_component.getDistribution()); }; @@ -514,7 +516,7 @@ Distributor::process_fetched_external_messages() } if (!_fetched_messages.empty()) { _fetched_messages.clear(); - _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED; + signal_work_was_done(); } } @@ -524,6 +526,7 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; if (!_use_legacy_mode) { enableNextDistribution(); + fetch_status_requests(); fetch_external_messages(); } // Propagates any new configs down to stripe(s) @@ -543,6 +546,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx) _tickResult = _stripe->_tickResult; } else { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + handle_status_requests(); process_fetched_external_messages(); _bucket_db_updater->resend_delayed_messages(); } @@ -571,15 +575,45 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c } } +void +Distributor::fetch_status_requests() +{ + if (_fetched_status_requests.empty()) { + _fetched_status_requests.swap(_status_to_do); + } +} + +void +Distributor::handle_status_requests() +{ + for (auto& s : _fetched_status_requests) { + s->getReporter().reportStatus(s->getStream(), s->getPath()); + s->notifyCompleted(); + } + if (!_fetched_status_requests.empty()) { + _fetched_status_requests.clear(); + signal_work_was_done(); + } +} + +void +Distributor::signal_work_was_done() +{ + _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED; +} + vespalib::string Distributor::getReportContentType(const framework::HttpUrlPath& path) const { - // This is const thread safe - // TODO STRIPE we should probably do this in the top-level distributor - if (_use_legacy_mode) { - return _stripe->getReportContentType(path); + assert(!_use_legacy_mode); + if (path.hasAttribute("page")) { + if (path.getAttribute("page") == "buckets") { + return "text/html"; + } else { + return "application/xml"; + } } else { - return first_stripe().getReportContentType(path); + return "text/html"; } } @@ -599,28 +633,54 @@ bool Distributor::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const { - // TODO STRIPE need to aggregate status responses _across_ stripes..! - if (_use_legacy_mode) { - return _stripe->reportStatus(out, path); + assert(!_use_legacy_mode); + if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") { + framework::PartlyHtmlStatusReporter htmlReporter(*this); + htmlReporter.reportHtmlHeader(out, path); + if (!path.hasAttribute("page")) { + out << "<a href=\"?page=pending\">Count of pending messages to " + << "storage nodes</a><br><a href=\"?page=maintenance&show=50\">" + << "List maintenance queue (adjust show parameter to see more " + << "operations, -1 for all)</a><br>\n<a href=\"?page=buckets\">" + << "List all buckets, highlight non-ideal state</a><br>\n"; + } else { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + const auto& op_ctx = _component; + for (const auto& space : op_ctx.bucket_space_repo()) { + out << "<h2>" << document::FixedBucketSpaces::to_string(space.first) << " - " << space.first << "</h2>\n"; + guard->report_bucket_db_status(space.first, out); + } + } + htmlReporter.reportHtmlFooter(out, path); } else { - auto guard = _stripe_accessor->rendezvous_and_hold_all(); - return first_stripe().reportStatus(out, path); + framework::PartlyXmlStatusReporter xmlReporter(*this, out, path); + using namespace vespalib::xml; + std::string page(path.getAttribute("page")); + + if (page == "pending") { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + auto stats = guard->pending_operation_stats(); + xmlReporter << XmlTag("pending") + << XmlAttribute("externalload", stats.external_load_operations) + << XmlAttribute("maintenance", stats.maintenance_operations) + << XmlEndTag(); + } } + return true; } bool Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const { - // TODO STRIPE need to aggregate status responses _across_ stripes..! - if (_use_legacy_mode) { - return _stripe->handleStatusRequest(request); - } else { - // Can't hold guard here or we'll deadlock by never allowing the thread to process the request - bool handled = first_stripe().handleStatusRequest(request); - // TODO STRIPE wake up stripe thread; handleStatusRequest waits for completion - // (not really needed since this will be removed) - return handled; + assert(!_use_legacy_mode); + auto wrappedRequest = std::make_shared<DistributorStatus>(request); + { + framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); + _status_to_do.push_back(wrappedRequest); + guard.broadcast(); } + wrappedRequest->waitForCompletion(); + return true; } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index f96160cf13c..4257657816a 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -168,6 +168,9 @@ private: void propagateInternalScanMetricsToExternal(); void scanAllBuckets(); void enableNextConfig(); + void fetch_status_requests(); + void handle_status_requests(); + void signal_work_was_done(); void enableNextDistribution(); void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); @@ -192,7 +195,10 @@ private: std::shared_ptr<const DistributorConfiguration> _total_config; std::unique_ptr<BucketDBUpdater> _bucket_db_updater; StatusReporterDelegate _distributorStatusDelegate; + std::unique_ptr<StatusReporterDelegate> _bucket_db_status_delegate; framework::TickingThreadPool& _threadPool; + mutable std::vector<std::shared_ptr<DistributorStatus>> _status_to_do; + mutable std::vector<std::shared_ptr<DistributorStatus>> _fetched_status_requests; framework::ThreadWaitInfo _tickResult; MetricUpdateHook _metricUpdateHook; DistributorHostInfoReporter _hostInfoReporter; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index e9969b79bd4..cfa30e04642 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -85,7 +85,10 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _must_send_updated_host_info(false), _use_legacy_mode(use_legacy_mode) { - _bucketDBStatusDelegate.registerStatusPage(); + if (use_legacy_mode) { + _distributorStatusDelegate.registerStatusPage(); + _bucketDBStatusDelegate.registerStatusPage(); + } propagateDefaultDistribution(_component.getDistribution()); propagateClusterStates(); }; @@ -766,10 +769,11 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex) _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; if (!_use_legacy_mode) { std::lock_guard lock(_external_message_mutex); - fetchStatusRequests(); fetchExternalMessages(); } - handleStatusRequests(); + if (_use_legacy_mode) { + handleStatusRequests(); + } startExternalOperations(); if (initializing()) { _bucketDBUpdater.resendDelayedMessages(); @@ -841,6 +845,7 @@ DistributorStripe::propagate_config_snapshot_to_internal_components() void DistributorStripe::fetchStatusRequests() { + assert(_use_legacy_mode); if (_fetchedStatusRequests.empty()) { _fetchedStatusRequests.swap(_statusToDo); } @@ -856,6 +861,7 @@ DistributorStripe::fetchExternalMessages() void DistributorStripe::handleStatusRequests() { + assert(_use_legacy_mode); uint32_t sz = _fetchedStatusRequests.size(); for (uint32_t i = 0; i < sz; ++i) { auto& s = *_fetchedStatusRequests[i]; @@ -871,6 +877,7 @@ DistributorStripe::handleStatusRequests() vespalib::string DistributorStripe::getReportContentType(const framework::HttpUrlPath& path) const { + assert(_use_legacy_mode); if (path.hasAttribute("page")) { if (path.getAttribute("page") == "buckets") { return "text/html"; @@ -894,10 +901,12 @@ DistributorStripe::getActiveOperations() const return _operationOwner.toString(); } +// TODO STRIPE remove this; delegated to top-level Distributor only bool DistributorStripe::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const { + assert(_use_legacy_mode); if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") { framework::PartlyHtmlStatusReporter htmlReporter(*this); htmlReporter.reportHtmlHeader(out, path); @@ -920,8 +929,7 @@ DistributorStripe::reportStatus(std::ostream& out, if (page == "pending") { xmlReporter << XmlTag("pending") << XmlAttribute("externalload", _operationOwner.size()) - << XmlAttribute("maintenance", - _maintenanceOperationOwner.size()) + << XmlAttribute("maintenance",_maintenanceOperationOwner.size()) << XmlEndTag(); } else if (page == "maintenance") { // Need new page @@ -935,18 +943,21 @@ DistributorStripe::reportStatus(std::ostream& out, bool DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const { + assert(_use_legacy_mode); auto wrappedRequest = std::make_shared<DistributorStatus>(request); - if (_use_legacy_mode) { + { framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); _statusToDo.push_back(wrappedRequest); guard.broadcast(); - } else { - std::lock_guard lock(_external_message_mutex); - _statusToDo.push_back(wrappedRequest); - // FIXME won't be woken up explicitly, but will be processed after 1ms anyway. } wrappedRequest->waitForCompletion(); return true; } +StripeAccessGuard::PendingOperationStats +DistributorStripe::pending_operation_stats() const +{ + return {_operationOwner.size(), _maintenanceOperationOwner.size()}; +} + } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index e14369ba7eb..efded7d29d5 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -117,6 +117,8 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; + StripeAccessGuard::PendingOperationStats pending_operation_stats() const; + std::string getActiveIdealStateOperations() const; std::string getActiveOperations() const; diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index a090f00300b..17f3911c6ee 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -288,8 +288,6 @@ IdealStateManager::getBucketStatus( } void IdealStateManager::dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const { - out << "<h2>" << document::FixedBucketSpaces::to_string(bucket_space) << " - " << bucket_space << "</h2>\n"; - StatusBucketVisitor proc(*this, bucket_space, out); auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket_space)); distributorBucketSpace.getBucketDatabase().forEach(proc); @@ -300,6 +298,7 @@ void IdealStateManager::getBucketStatus(std::ostream& out) const { _distributorComponent.getDistributor().getClusterStateBundle().getVersion()); for (auto& space : _bucketSpaceRepo) { + out << "<h2>" << document::FixedBucketSpaces::to_string(space.first) << " - " << space.first << "</h2>\n"; dump_bucket_space_db_status(space.first, out); } } diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h index ebcaad4cf96..0bffed449ef 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.h +++ b/storage/src/vespa/storage/distributor/idealstatemanager.h @@ -68,6 +68,9 @@ public: IdealStateMetricSet& getMetrics() { return *_metrics; } + + void dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const; + void getBucketStatus(std::ostream& out) const; // HtmlStatusReporter @@ -126,7 +129,6 @@ private: void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::ConstEntryRef& entry, NodeMaintenanceStatsTracker& statsTracker, std::ostream& out) const; - void dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const; }; } diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index 03e47d2cb67..a4a59745e3e 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -98,10 +98,35 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases(); } +void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const { + // TODO STRIPE multiple stripes + first_stripe().ideal_state_manager().dump_bucket_space_db_status(bucket_space, out); +} + +StripeAccessGuard::PendingOperationStats +MultiThreadedStripeAccessGuard::pending_operation_stats() const { + // TODO STRIPE multiple stripes + return first_stripe().pending_operation_stats(); +} + +void MultiThreadedStripeAccessGuard::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().report_single_bucket_requests(xos); +} + +void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().report_delayed_single_bucket_requests(xos); +} + DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe()); } +const DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept { + return dynamic_cast<const DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe()); +} + std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() { // For sanity checking of invariant of only one guard being allowed at any given time. assert(!_guard_held); diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index 03e36c29bba..a44f069d615 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -52,9 +52,16 @@ public: void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override; void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override; void clear_read_only_bucket_repo_databases() override; + + void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override; + PendingOperationStats pending_operation_stats() const override; + void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; + void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; + private: // TODO STRIPE remove once multi threaded stripe support is implemented DistributorStripe& first_stripe() noexcept; + const DistributorStripe& first_stripe() const noexcept; }; /** diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h index 1d570cbb3bc..a1779a4eb4f 100644 --- a/storage/src/vespa/storage/distributor/stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -16,6 +16,8 @@ class Distribution; namespace storage { class DistributorConfiguration; } +namespace vespalib::xml { class XmlOutputStream; } + namespace storage::distributor { /** @@ -54,6 +56,22 @@ public: virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0; virtual void clear_read_only_bucket_repo_databases() = 0; + // TODO STRIPE: Add merge() function. + struct PendingOperationStats { + size_t external_load_operations; + size_t maintenance_operations; + PendingOperationStats(size_t external_load_operations_in, + size_t maintenance_operations_in) + : external_load_operations(external_load_operations_in), + maintenance_operations(maintenance_operations_in) {} + }; + + // Functions used for state reporting + virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0; + virtual PendingOperationStats pending_operation_stats() const = 0; + virtual void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0; + virtual void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0; + }; /** diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp index 8c24effa616..77151da19bc 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -21,9 +21,10 @@ #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.stripe_bucket_db_updater"); +using document::BucketSpace; using storage::lib::Node; using storage::lib::NodeType; -using document::BucketSpace; +using vespalib::xml::XmlAttribute; namespace storage::distributor { @@ -960,20 +961,30 @@ StripeBucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, } xos << XmlEndTag() << XmlTag("single_bucket_requests"); - for (const auto & entry : _sentMessages) - { - entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp)); - } + report_single_bucket_requests(xos); xos << XmlEndTag() << XmlTag("delayed_single_bucket_requests"); - for (const auto & entry : _delayedRequests) - { - entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime())); - } + report_delayed_single_bucket_requests(xos); xos << XmlEndTag() << XmlEndTag(); return ""; } +void +StripeBucketDBUpdater::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const +{ + for (const auto& entry : _sentMessages) { + entry.second.print_xml_tag(xos, XmlAttribute("sendtimestamp", entry.second.timestamp)); + } +} + +void +StripeBucketDBUpdater::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const +{ + for (const auto& entry : _delayedRequests) { + entry.second.print_xml_tag(xos, XmlAttribute("resendtimestamp", entry.first.getTime())); + } +} + StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover( const lib::ClusterState& oldState, const lib::ClusterState& s, diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index dddb4d2bc9a..5f843b8ff33 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -59,6 +59,11 @@ public: vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const; vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; + + // Functions used for state reporting when a StripeAccessGuard is held. + void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const; + void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const; + void print(std::ostream& out, bool verbose, const std::string& indent) const; const DistributorNodeContext& node_context() const { return _node_ctx; } DistributorStripeOperationContext& operation_context() { return _op_ctx; } |