diff options
9 files changed, 125 insertions, 58 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index 9c6ca9dff69..d7f3e77c6fd 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -5,8 +5,8 @@ namespace mbus { -RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) : - _lock(), +RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb, ctor_tag) + : _lock(), _orb(orb), _name(spec), _target(*_orb.GetTarget(spec.c_str())), @@ -48,6 +48,7 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) handler.handleVersion(_version.get()); } else if (shouldInvoke) { FRT_RPCRequest *req = _orb.AllocRPCRequest(); + req->getStash().create<SP>(shared_from_this()); req->SetMethodName("mbus.getVersion"); _target.InvokeAsync(req, vespalib::to_s(timeout), this); } @@ -67,8 +68,9 @@ RPCTarget::isValid() const } void -RPCTarget::RequestDone(FRT_RPCRequest *req) +RPCTarget::RequestDone(FRT_RPCRequest *raw_req) { + auto req = vespalib::ref_counted<FRT_RPCRequest>::internal_attach(raw_req); HandlerList handlers; { std::lock_guard guard(_lock); @@ -94,7 +96,6 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) _state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED); } _cond.notify_all(); - req->internal_subref(); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index fffffae64f7..77fcef5f48f 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -13,7 +13,7 @@ namespace mbus { * target. Instances of this class are returned by {@link RPCService}, and * cached by {@link RPCTargetPool}. */ -class RPCTarget : public FRT_IRequestWait { +class RPCTarget : public FRT_IRequestWait, public std::enable_shared_from_this<RPCTarget> { public: /** * Declares a version handler used when resolving the version of a target. @@ -58,6 +58,7 @@ private: Version_UP _version; HandlerList _versionHandlers; + struct ctor_tag {}; public: /** * Convenience typedefs. @@ -72,7 +73,10 @@ public: * @param spec The connection spec of this target. * @param orb The FRT supervisor to use when connecting to target. */ - RPCTarget(const string &name, FRT_Supervisor &orb); + RPCTarget(const string &name, FRT_Supervisor &orb, ctor_tag); + static SP create(const string &name, FRT_Supervisor &orb) { + return std::make_shared<RPCTarget>(name, orb, ctor_tag{}); + } /** * Destructor. Subrefs the contained FRT target. diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index b403c65f863..db09b127114 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -97,7 +97,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address) std::vector<RPCTarget::SP> targets; targets.reserve(_numTargetsPerSpec); for (size_t i(0); i < _numTargetsPerSpec; i++) { - targets.push_back(std::make_shared<RPCTarget>(spec, orb)); + targets.push_back(RPCTarget::create(spec, orb)); } _targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime))); return _targets.find(spec)->second.getTarget(guard, currentTime); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java index 07a8d545178..e9dbfa0c524 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java @@ -102,6 +102,17 @@ public class Metrics { } } + public void deleteMetricByName(String application, String metricName, DimensionType type) { + synchronized (monitor) { + Optional.ofNullable(metrics.get(type)) + .map(m -> m.get(application)) + .map(ApplicationMetrics::metricsByDimensions) + .ifPresent(dims -> + dims.values().forEach(metrics -> metrics.remove(metricName)) + ); + } + } + Map<Dimensions, Map<String, MetricValue>> getOrCreateApplicationMetrics(String application, DimensionType type) { return metrics.computeIfAbsent(type, m -> new HashMap<>()) .computeIfAbsent(application, app -> new ApplicationMetrics()) diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index ede85c036b3..37d81f45ac1 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -314,7 +314,7 @@ DistributorStripe::enterRecoveryMode() { LOG(debug, "Entering recovery mode"); _schedulingMode = MaintenanceScheduler::RECOVERY_SCHEDULING_MODE; - _scanner->reset(); + _scanner->reset(); // Just drop accumulated stat on the floor. // We enter recovery mode due to cluster state or distribution config changes. // Until we have completed a new DB scan round, we don't know the state of our // newly owned buckets and must not report stats for these out to the cluster @@ -643,7 +643,7 @@ DistributorStripe::updateInternalMetricsForCompletedScan() _bucketDBMetricUpdater.completeRound(); _bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats(); - _maintenanceStats = _scanner->getPendingMaintenanceStats(); + _maintenanceStats = _scanner->reset(); auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats); if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) { _must_send_updated_host_info = true; @@ -684,7 +684,6 @@ DistributorStripe::scanNextBucket() updateInternalMetricsForCompletedScan(); leaveRecoveryMode(); send_updated_host_info_if_required(); - _scanner->reset(); } else { const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution()); _bucketDBMetricUpdater.visit(scanResult.getEntry(), distribution.getRedundancy()); diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp index 7ac99f5712f..592d92940d6 100644 --- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "node_maintenance_stats_tracker.h" +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/stllike/hash_map_equal.hpp> #include <ostream> namespace storage::distributor { @@ -22,6 +24,54 @@ merge_bucket_spaces_stats(NodeMaintenanceStatsTracker::BucketSpacesStats& dest, } void +NodeMaintenanceStatsTracker::incMovingOut(uint16_t node, document::BucketSpace bucketSpace) { + ++_node_stats[node][bucketSpace].movingOut; + ++_total_stats.movingOut; +} + +void +NodeMaintenanceStatsTracker::incSyncing(uint16_t node, document::BucketSpace bucketSpace) { + ++_node_stats[node][bucketSpace].syncing; + ++_total_stats.syncing; +} + +void +NodeMaintenanceStatsTracker::incCopyingIn(uint16_t node, document::BucketSpace bucketSpace) { + ++_node_stats[node][bucketSpace].copyingIn; + ++_total_stats.copyingIn; +} + +void +NodeMaintenanceStatsTracker::incCopyingOut(uint16_t node, document::BucketSpace bucketSpace) { + ++_node_stats[node][bucketSpace].copyingOut; + ++_total_stats.copyingOut; +} + +void +NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker::incTotal(uint16_t node, document::BucketSpace bucketSpace) { + ++_node_stats[node][bucketSpace].total; + ++_total_stats.total; +} + +const NodeMaintenanceStats& +NodeMaintenanceStatsTracker::forNode(uint16_t node, document::BucketSpace bucketSpace) const { + auto nodeItr = _node_stats.find(node); + if (nodeItr != _node_stats.end()) { + auto bucketSpaceItr = nodeItr->second.find(bucketSpace); + if (bucketSpaceItr != nodeItr->second.end()) { + return bucketSpaceItr->second; + } + } + return _emptyNodeMaintenanceStats; +} + +bool +NodeMaintenanceStatsTracker::operator==(const NodeMaintenanceStatsTracker& rhs) const noexcept { + return ((_node_stats == rhs._node_stats) && + (_max_observed_time_since_last_gc == rhs._max_observed_time_since_last_gc)); +} + +void NodeMaintenanceStatsTracker::merge(const NodeMaintenanceStatsTracker& rhs) { for (const auto& entry : rhs._node_stats) { @@ -45,13 +95,24 @@ operator<<(std::ostream& os, const NodeMaintenanceStats& stats) return os; } -NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker() +NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker() noexcept : _node_stats(), _total_stats(), _max_observed_time_since_last_gc(0) {} +NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker(NodeMaintenanceStatsTracker &&) noexcept = default; +NodeMaintenanceStatsTracker & NodeMaintenanceStatsTracker::operator =(NodeMaintenanceStatsTracker &&) noexcept = default; +NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker(const NodeMaintenanceStatsTracker &) = default; NodeMaintenanceStatsTracker::~NodeMaintenanceStatsTracker() = default; +void +NodeMaintenanceStatsTracker::reset(size_t nodes) { + _node_stats.clear(); + _node_stats.resize(nodes); + _total_stats = NodeMaintenanceStats(); + _max_observed_time_since_last_gc = vespalib::duration::zero(); +} + } diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h index 3818dd4bacb..84705fbca9d 100644 --- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h +++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <unordered_map> #include <vespa/document/bucket/bucketspace.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/stllike/hash_map.h> namespace storage::distributor { @@ -51,8 +51,8 @@ std::ostream& operator<<(std::ostream&, const NodeMaintenanceStats&); class NodeMaintenanceStatsTracker { public: - using BucketSpacesStats = std::unordered_map<document::BucketSpace, NodeMaintenanceStats, document::BucketSpace::hash>; - using PerNodeStats = std::unordered_map<uint16_t, BucketSpacesStats>; + using BucketSpacesStats = vespalib::hash_map<document::BucketSpace, NodeMaintenanceStats, document::BucketSpace::hash>; + using PerNodeStats = vespalib::hash_map<uint16_t, BucketSpacesStats>; private: PerNodeStats _node_stats; @@ -62,33 +62,23 @@ private: static const NodeMaintenanceStats _emptyNodeMaintenanceStats; public: - NodeMaintenanceStatsTracker(); + NodeMaintenanceStatsTracker() noexcept; + NodeMaintenanceStatsTracker(NodeMaintenanceStatsTracker &&) noexcept; + NodeMaintenanceStatsTracker & operator =(NodeMaintenanceStatsTracker &&) noexcept; + NodeMaintenanceStatsTracker(const NodeMaintenanceStatsTracker &); ~NodeMaintenanceStatsTracker(); + void reset(size_t nodes); + size_t numNodes() const { return _node_stats.size(); } - void incMovingOut(uint16_t node, document::BucketSpace bucketSpace) { - ++_node_stats[node][bucketSpace].movingOut; - ++_total_stats.movingOut; - } + void incMovingOut(uint16_t node, document::BucketSpace bucketSpace); - void incSyncing(uint16_t node, document::BucketSpace bucketSpace) { - ++_node_stats[node][bucketSpace].syncing; - ++_total_stats.syncing; - } + void incSyncing(uint16_t node, document::BucketSpace bucketSpace); - void incCopyingIn(uint16_t node, document::BucketSpace bucketSpace) { - ++_node_stats[node][bucketSpace].copyingIn; - ++_total_stats.copyingIn; - } + void incCopyingIn(uint16_t node, document::BucketSpace bucketSpace); - void incCopyingOut(uint16_t node, document::BucketSpace bucketSpace) { - ++_node_stats[node][bucketSpace].copyingOut; - ++_total_stats.copyingOut; - } + void incCopyingOut(uint16_t node, document::BucketSpace bucketSpace); - void incTotal(uint16_t node, document::BucketSpace bucketSpace) { - ++_node_stats[node][bucketSpace].total; - ++_total_stats.total; - } + void incTotal(uint16_t node, document::BucketSpace bucketSpace); void update_observed_time_since_last_gc(vespalib::duration time_since_gc) noexcept { _max_observed_time_since_last_gc = std::max(time_since_gc, _max_observed_time_since_last_gc); @@ -98,18 +88,9 @@ public: * Returned statistics for a given node index and bucket space, or all zero statistics * if none have been recorded yet */ - const NodeMaintenanceStats& forNode(uint16_t node, document::BucketSpace bucketSpace) const { - auto nodeItr = _node_stats.find(node); - if (nodeItr != _node_stats.end()) { - auto bucketSpaceItr = nodeItr->second.find(bucketSpace); - if (bucketSpaceItr != nodeItr->second.end()) { - return bucketSpaceItr->second; - } - } - return _emptyNodeMaintenanceStats; - } + const NodeMaintenanceStats& forNode(uint16_t node, document::BucketSpace bucketSpace) const; - const PerNodeStats& perNodeStats() const { + const PerNodeStats& perNodeStats() const noexcept { return _node_stats; } @@ -124,10 +105,7 @@ public: return _max_observed_time_since_last_gc; } - bool operator==(const NodeMaintenanceStatsTracker& rhs) const { - return ((_node_stats == rhs._node_stats) && - (_max_observed_time_since_last_gc == rhs._max_observed_time_since_last_gc)); - } + bool operator==(const NodeMaintenanceStatsTracker& rhs) const noexcept; void merge(const NodeMaintenanceStatsTracker& rhs); }; diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp index afcbef32584..e0c1abaaffa 100644 --- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp @@ -41,11 +41,20 @@ SimpleMaintenanceScanner::PendingMaintenanceStats::merge(const PendingMaintenanc perNodeStats.merge(rhs.perNodeStats); } -SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() = default; +SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() noexcept = default; SimpleMaintenanceScanner::PendingMaintenanceStats::~PendingMaintenanceStats() = default; SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(const PendingMaintenanceStats &) = default; +SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(PendingMaintenanceStats &&) noexcept = default; SimpleMaintenanceScanner::PendingMaintenanceStats & -SimpleMaintenanceScanner::PendingMaintenanceStats::operator = (const PendingMaintenanceStats &) = default; +SimpleMaintenanceScanner::PendingMaintenanceStats::operator = (PendingMaintenanceStats &&) noexcept = default; + +SimpleMaintenanceScanner::PendingMaintenanceStats +SimpleMaintenanceScanner::PendingMaintenanceStats::reset() { + PendingMaintenanceStats prev = std::move(*this); + global = GlobalMaintenanceStats(); + perNodeStats.reset(prev.perNodeStats.numNodes()); + return prev; +} MaintenanceScanner::ScanResult SimpleMaintenanceScanner::scanNext() @@ -68,12 +77,12 @@ SimpleMaintenanceScanner::scanNext() } } -void +SimpleMaintenanceScanner::PendingMaintenanceStats SimpleMaintenanceScanner::reset() { _bucketCursor = document::BucketId(); _bucketSpaceItr = _bucketSpaceRepo.begin(); - _pendingMaintenance = PendingMaintenanceStats(); + return _pendingMaintenance.reset(); } void diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h index 7af61815c31..35b022c7af7 100644 --- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h +++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h @@ -23,11 +23,14 @@ public: void merge(const GlobalMaintenanceStats& rhs) noexcept; }; struct PendingMaintenanceStats { - PendingMaintenanceStats(); + PendingMaintenanceStats() noexcept; PendingMaintenanceStats(const PendingMaintenanceStats &); - PendingMaintenanceStats &operator = (const PendingMaintenanceStats &); + PendingMaintenanceStats &operator = (const PendingMaintenanceStats &) = delete; + PendingMaintenanceStats(PendingMaintenanceStats &&) noexcept; + PendingMaintenanceStats &operator = (PendingMaintenanceStats &&) noexcept; ~PendingMaintenanceStats(); - GlobalMaintenanceStats global; + PendingMaintenanceStats reset(); + GlobalMaintenanceStats global; NodeMaintenanceStatsTracker perNodeStats; void merge(const PendingMaintenanceStats& rhs); @@ -50,11 +53,12 @@ public: ~SimpleMaintenanceScanner() override; ScanResult scanNext() override; - void reset(); + PendingMaintenanceStats reset(); // TODO: move out into own interface! void prioritizeBucket(const document::Bucket &id); + // TODO Only for testing const PendingMaintenanceStats& getPendingMaintenanceStats() const noexcept { return _pendingMaintenance; } |