summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h8
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp2
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java11
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp63
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h56
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h12
9 files changed, 125 insertions, 58 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index 9c6ca9dff69..d7f3e77c6fd 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -5,8 +5,8 @@
namespace mbus {
-RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) :
- _lock(),
+RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb, ctor_tag)
+ : _lock(),
_orb(orb),
_name(spec),
_target(*_orb.GetTarget(spec.c_str())),
@@ -48,6 +48,7 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler)
handler.handleVersion(_version.get());
} else if (shouldInvoke) {
FRT_RPCRequest *req = _orb.AllocRPCRequest();
+ req->getStash().create<SP>(shared_from_this());
req->SetMethodName("mbus.getVersion");
_target.InvokeAsync(req, vespalib::to_s(timeout), this);
}
@@ -67,8 +68,9 @@ RPCTarget::isValid() const
}
void
-RPCTarget::RequestDone(FRT_RPCRequest *req)
+RPCTarget::RequestDone(FRT_RPCRequest *raw_req)
{
+ auto req = vespalib::ref_counted<FRT_RPCRequest>::internal_attach(raw_req);
HandlerList handlers;
{
std::lock_guard guard(_lock);
@@ -94,7 +96,6 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
_state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED);
}
_cond.notify_all();
- req->internal_subref();
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index fffffae64f7..77fcef5f48f 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -13,7 +13,7 @@ namespace mbus {
* target. Instances of this class are returned by {@link RPCService}, and
* cached by {@link RPCTargetPool}.
*/
-class RPCTarget : public FRT_IRequestWait {
+class RPCTarget : public FRT_IRequestWait, public std::enable_shared_from_this<RPCTarget> {
public:
/**
* Declares a version handler used when resolving the version of a target.
@@ -58,6 +58,7 @@ private:
Version_UP _version;
HandlerList _versionHandlers;
+ struct ctor_tag {};
public:
/**
* Convenience typedefs.
@@ -72,7 +73,10 @@ public:
* @param spec The connection spec of this target.
* @param orb The FRT supervisor to use when connecting to target.
*/
- RPCTarget(const string &name, FRT_Supervisor &orb);
+ RPCTarget(const string &name, FRT_Supervisor &orb, ctor_tag);
+ static SP create(const string &name, FRT_Supervisor &orb) {
+ return std::make_shared<RPCTarget>(name, orb, ctor_tag{});
+ }
/**
* Destructor. Subrefs the contained FRT target.
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index b403c65f863..db09b127114 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -97,7 +97,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
std::vector<RPCTarget::SP> targets;
targets.reserve(_numTargetsPerSpec);
for (size_t i(0); i < _numTargetsPerSpec; i++) {
- targets.push_back(std::make_shared<RPCTarget>(spec, orb));
+ targets.push_back(RPCTarget::create(spec, orb));
}
_targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime)));
return _targets.find(spec)->second.getTarget(guard, currentTime);
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java
index 07a8d545178..e9dbfa0c524 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/container/metrics/Metrics.java
@@ -102,6 +102,17 @@ public class Metrics {
}
}
+ public void deleteMetricByName(String application, String metricName, DimensionType type) {
+ synchronized (monitor) {
+ Optional.ofNullable(metrics.get(type))
+ .map(m -> m.get(application))
+ .map(ApplicationMetrics::metricsByDimensions)
+ .ifPresent(dims ->
+ dims.values().forEach(metrics -> metrics.remove(metricName))
+ );
+ }
+ }
+
Map<Dimensions, Map<String, MetricValue>> getOrCreateApplicationMetrics(String application, DimensionType type) {
return metrics.computeIfAbsent(type, m -> new HashMap<>())
.computeIfAbsent(application, app -> new ApplicationMetrics())
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index ede85c036b3..37d81f45ac1 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -314,7 +314,7 @@ DistributorStripe::enterRecoveryMode()
{
LOG(debug, "Entering recovery mode");
_schedulingMode = MaintenanceScheduler::RECOVERY_SCHEDULING_MODE;
- _scanner->reset();
+ _scanner->reset(); // Just drop accumulated stat on the floor.
// We enter recovery mode due to cluster state or distribution config changes.
// Until we have completed a new DB scan round, we don't know the state of our
// newly owned buckets and must not report stats for these out to the cluster
@@ -643,7 +643,7 @@ DistributorStripe::updateInternalMetricsForCompletedScan()
_bucketDBMetricUpdater.completeRound();
_bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats();
- _maintenanceStats = _scanner->getPendingMaintenanceStats();
+ _maintenanceStats = _scanner->reset();
auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats);
if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) {
_must_send_updated_host_info = true;
@@ -684,7 +684,6 @@ DistributorStripe::scanNextBucket()
updateInternalMetricsForCompletedScan();
leaveRecoveryMode();
send_updated_host_info_if_required();
- _scanner->reset();
} else {
const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution());
_bucketDBMetricUpdater.visit(scanResult.getEntry(), distribution.getRedundancy());
diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp
index 7ac99f5712f..592d92940d6 100644
--- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp
+++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "node_maintenance_stats_tracker.h"
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/stllike/hash_map_equal.hpp>
#include <ostream>
namespace storage::distributor {
@@ -22,6 +24,54 @@ merge_bucket_spaces_stats(NodeMaintenanceStatsTracker::BucketSpacesStats& dest,
}
void
+NodeMaintenanceStatsTracker::incMovingOut(uint16_t node, document::BucketSpace bucketSpace) {
+ ++_node_stats[node][bucketSpace].movingOut;
+ ++_total_stats.movingOut;
+}
+
+void
+NodeMaintenanceStatsTracker::incSyncing(uint16_t node, document::BucketSpace bucketSpace) {
+ ++_node_stats[node][bucketSpace].syncing;
+ ++_total_stats.syncing;
+}
+
+void
+NodeMaintenanceStatsTracker::incCopyingIn(uint16_t node, document::BucketSpace bucketSpace) {
+ ++_node_stats[node][bucketSpace].copyingIn;
+ ++_total_stats.copyingIn;
+}
+
+void
+NodeMaintenanceStatsTracker::incCopyingOut(uint16_t node, document::BucketSpace bucketSpace) {
+ ++_node_stats[node][bucketSpace].copyingOut;
+ ++_total_stats.copyingOut;
+}
+
+void
+NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker::incTotal(uint16_t node, document::BucketSpace bucketSpace) {
+ ++_node_stats[node][bucketSpace].total;
+ ++_total_stats.total;
+}
+
+const NodeMaintenanceStats&
+NodeMaintenanceStatsTracker::forNode(uint16_t node, document::BucketSpace bucketSpace) const {
+ auto nodeItr = _node_stats.find(node);
+ if (nodeItr != _node_stats.end()) {
+ auto bucketSpaceItr = nodeItr->second.find(bucketSpace);
+ if (bucketSpaceItr != nodeItr->second.end()) {
+ return bucketSpaceItr->second;
+ }
+ }
+ return _emptyNodeMaintenanceStats;
+}
+
+bool
+NodeMaintenanceStatsTracker::operator==(const NodeMaintenanceStatsTracker& rhs) const noexcept {
+ return ((_node_stats == rhs._node_stats) &&
+ (_max_observed_time_since_last_gc == rhs._max_observed_time_since_last_gc));
+}
+
+void
NodeMaintenanceStatsTracker::merge(const NodeMaintenanceStatsTracker& rhs)
{
for (const auto& entry : rhs._node_stats) {
@@ -45,13 +95,24 @@ operator<<(std::ostream& os, const NodeMaintenanceStats& stats)
return os;
}
-NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker()
+NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker() noexcept
: _node_stats(),
_total_stats(),
_max_observed_time_since_last_gc(0)
{}
+NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker(NodeMaintenanceStatsTracker &&) noexcept = default;
+NodeMaintenanceStatsTracker & NodeMaintenanceStatsTracker::operator =(NodeMaintenanceStatsTracker &&) noexcept = default;
+NodeMaintenanceStatsTracker::NodeMaintenanceStatsTracker(const NodeMaintenanceStatsTracker &) = default;
NodeMaintenanceStatsTracker::~NodeMaintenanceStatsTracker() = default;
+void
+NodeMaintenanceStatsTracker::reset(size_t nodes) {
+ _node_stats.clear();
+ _node_stats.resize(nodes);
+ _total_stats = NodeMaintenanceStats();
+ _max_observed_time_since_last_gc = vespalib::duration::zero();
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h
index 3818dd4bacb..84705fbca9d 100644
--- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h
+++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h
@@ -1,9 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <unordered_map>
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/stllike/hash_map.h>
namespace storage::distributor {
@@ -51,8 +51,8 @@ std::ostream& operator<<(std::ostream&, const NodeMaintenanceStats&);
class NodeMaintenanceStatsTracker
{
public:
- using BucketSpacesStats = std::unordered_map<document::BucketSpace, NodeMaintenanceStats, document::BucketSpace::hash>;
- using PerNodeStats = std::unordered_map<uint16_t, BucketSpacesStats>;
+ using BucketSpacesStats = vespalib::hash_map<document::BucketSpace, NodeMaintenanceStats, document::BucketSpace::hash>;
+ using PerNodeStats = vespalib::hash_map<uint16_t, BucketSpacesStats>;
private:
PerNodeStats _node_stats;
@@ -62,33 +62,23 @@ private:
static const NodeMaintenanceStats _emptyNodeMaintenanceStats;
public:
- NodeMaintenanceStatsTracker();
+ NodeMaintenanceStatsTracker() noexcept;
+ NodeMaintenanceStatsTracker(NodeMaintenanceStatsTracker &&) noexcept;
+ NodeMaintenanceStatsTracker & operator =(NodeMaintenanceStatsTracker &&) noexcept;
+ NodeMaintenanceStatsTracker(const NodeMaintenanceStatsTracker &);
~NodeMaintenanceStatsTracker();
+ void reset(size_t nodes);
+ size_t numNodes() const { return _node_stats.size(); }
- void incMovingOut(uint16_t node, document::BucketSpace bucketSpace) {
- ++_node_stats[node][bucketSpace].movingOut;
- ++_total_stats.movingOut;
- }
+ void incMovingOut(uint16_t node, document::BucketSpace bucketSpace);
- void incSyncing(uint16_t node, document::BucketSpace bucketSpace) {
- ++_node_stats[node][bucketSpace].syncing;
- ++_total_stats.syncing;
- }
+ void incSyncing(uint16_t node, document::BucketSpace bucketSpace);
- void incCopyingIn(uint16_t node, document::BucketSpace bucketSpace) {
- ++_node_stats[node][bucketSpace].copyingIn;
- ++_total_stats.copyingIn;
- }
+ void incCopyingIn(uint16_t node, document::BucketSpace bucketSpace);
- void incCopyingOut(uint16_t node, document::BucketSpace bucketSpace) {
- ++_node_stats[node][bucketSpace].copyingOut;
- ++_total_stats.copyingOut;
- }
+ void incCopyingOut(uint16_t node, document::BucketSpace bucketSpace);
- void incTotal(uint16_t node, document::BucketSpace bucketSpace) {
- ++_node_stats[node][bucketSpace].total;
- ++_total_stats.total;
- }
+ void incTotal(uint16_t node, document::BucketSpace bucketSpace);
void update_observed_time_since_last_gc(vespalib::duration time_since_gc) noexcept {
_max_observed_time_since_last_gc = std::max(time_since_gc, _max_observed_time_since_last_gc);
@@ -98,18 +88,9 @@ public:
* Returned statistics for a given node index and bucket space, or all zero statistics
* if none have been recorded yet
*/
- const NodeMaintenanceStats& forNode(uint16_t node, document::BucketSpace bucketSpace) const {
- auto nodeItr = _node_stats.find(node);
- if (nodeItr != _node_stats.end()) {
- auto bucketSpaceItr = nodeItr->second.find(bucketSpace);
- if (bucketSpaceItr != nodeItr->second.end()) {
- return bucketSpaceItr->second;
- }
- }
- return _emptyNodeMaintenanceStats;
- }
+ const NodeMaintenanceStats& forNode(uint16_t node, document::BucketSpace bucketSpace) const;
- const PerNodeStats& perNodeStats() const {
+ const PerNodeStats& perNodeStats() const noexcept {
return _node_stats;
}
@@ -124,10 +105,7 @@ public:
return _max_observed_time_since_last_gc;
}
- bool operator==(const NodeMaintenanceStatsTracker& rhs) const {
- return ((_node_stats == rhs._node_stats) &&
- (_max_observed_time_since_last_gc == rhs._max_observed_time_since_last_gc));
- }
+ bool operator==(const NodeMaintenanceStatsTracker& rhs) const noexcept;
void merge(const NodeMaintenanceStatsTracker& rhs);
};
diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
index afcbef32584..e0c1abaaffa 100644
--- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
+++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
@@ -41,11 +41,20 @@ SimpleMaintenanceScanner::PendingMaintenanceStats::merge(const PendingMaintenanc
perNodeStats.merge(rhs.perNodeStats);
}
-SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() = default;
+SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() noexcept = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::~PendingMaintenanceStats() = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(const PendingMaintenanceStats &) = default;
+SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(PendingMaintenanceStats &&) noexcept = default;
SimpleMaintenanceScanner::PendingMaintenanceStats &
-SimpleMaintenanceScanner::PendingMaintenanceStats::operator = (const PendingMaintenanceStats &) = default;
+SimpleMaintenanceScanner::PendingMaintenanceStats::operator = (PendingMaintenanceStats &&) noexcept = default;
+
+SimpleMaintenanceScanner::PendingMaintenanceStats
+SimpleMaintenanceScanner::PendingMaintenanceStats::reset() {
+ PendingMaintenanceStats prev = std::move(*this);
+ global = GlobalMaintenanceStats();
+ perNodeStats.reset(prev.perNodeStats.numNodes());
+ return prev;
+}
MaintenanceScanner::ScanResult
SimpleMaintenanceScanner::scanNext()
@@ -68,12 +77,12 @@ SimpleMaintenanceScanner::scanNext()
}
}
-void
+SimpleMaintenanceScanner::PendingMaintenanceStats
SimpleMaintenanceScanner::reset()
{
_bucketCursor = document::BucketId();
_bucketSpaceItr = _bucketSpaceRepo.begin();
- _pendingMaintenance = PendingMaintenanceStats();
+ return _pendingMaintenance.reset();
}
void
diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h
index 7af61815c31..35b022c7af7 100644
--- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h
+++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h
@@ -23,11 +23,14 @@ public:
void merge(const GlobalMaintenanceStats& rhs) noexcept;
};
struct PendingMaintenanceStats {
- PendingMaintenanceStats();
+ PendingMaintenanceStats() noexcept;
PendingMaintenanceStats(const PendingMaintenanceStats &);
- PendingMaintenanceStats &operator = (const PendingMaintenanceStats &);
+ PendingMaintenanceStats &operator = (const PendingMaintenanceStats &) = delete;
+ PendingMaintenanceStats(PendingMaintenanceStats &&) noexcept;
+ PendingMaintenanceStats &operator = (PendingMaintenanceStats &&) noexcept;
~PendingMaintenanceStats();
- GlobalMaintenanceStats global;
+ PendingMaintenanceStats reset();
+ GlobalMaintenanceStats global;
NodeMaintenanceStatsTracker perNodeStats;
void merge(const PendingMaintenanceStats& rhs);
@@ -50,11 +53,12 @@ public:
~SimpleMaintenanceScanner() override;
ScanResult scanNext() override;
- void reset();
+ PendingMaintenanceStats reset();
// TODO: move out into own interface!
void prioritizeBucket(const document::Bucket &id);
+ // TODO Only for testing
const PendingMaintenanceStats& getPendingMaintenanceStats() const noexcept {
return _pendingMaintenance;
}