aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-06-15 11:38:40 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-06-15 11:38:40 +0000
commit8bd2eca7f2f26249c910b71f1686566bec69a754 (patch)
tree2d11e7457ededf8d714f95998a705d0b17447736
parent8628cb25532508e615a8bfe580799f20d0c5afe7 (diff)
Add per stripe handling of ideal state metrics with aggregation on top.
This is handled similarly to per stripe distributor metrics.
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp20
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h1
-rw-r--r--storage/src/vespa/storage/distributor/ideal_state_total_metrics.cpp51
-rw-r--r--storage/src/vespa/storage/distributor/ideal_state_total_metrics.h28
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h26
9 files changed, 127 insertions, 36 deletions
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 231361d72d6..eba76c91af0 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -28,6 +28,7 @@ vespa_add_library(storage_distributor
distributormetricsset.cpp
externaloperationhandler.cpp
ideal_service_layer_nodes_bundle.cpp
+ ideal_state_total_metrics.cpp
idealstatemanager.cpp
idealstatemetricsset.cpp
messagetracker.cpp
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index d6727299ee6..448cbc9809d 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -61,10 +61,17 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_comp_reg(compReg),
_use_legacy_mode(num_distributor_stripes == 0),
_metrics(std::make_shared<DistributorMetricSet>()),
- _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
+ _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() :
+ std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)),
+ _ideal_state_metrics(_use_legacy_mode ? std::make_shared<IdealStateMetricSet>() : std::shared_ptr<IdealStateMetricSet>()),
+ _ideal_state_total_metrics(_use_legacy_mode ? std::shared_ptr<IdealStateTotalMetrics>() :
+ std::make_shared<IdealStateTotalMetrics>(num_distributor_stripes)),
_messageSender(messageSender),
_n_stripe_bits(0),
- _stripe(std::make_unique<DistributorStripe>(compReg, _use_legacy_mode ? *_metrics : _total_metrics->stripe(0), node_identity, threadPool,
+ _stripe(std::make_unique<DistributorStripe>(compReg,
+ _use_legacy_mode ? *_metrics : _total_metrics->stripe(0),
+ _use_legacy_mode ? *_ideal_state_metrics : _ideal_state_total_metrics->stripe(0),
+ node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(stripe_pool),
_stripes(),
@@ -74,6 +81,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_message_queue(),
_fetched_messages(),
_component(*this, compReg, "distributor"),
+ _ideal_state_component(compReg, "Ideal state manager"),
_total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
_distributorStatusDelegate(compReg, *this, *this),
@@ -93,6 +101,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_current_internal_config_generation(_component.internal_config_generation())
{
_component.registerMetric(_use_legacy_mode ? *_metrics : *_total_metrics);
+ _ideal_state_component.registerMetric(_use_legacy_mode ? *_ideal_state_metrics :
+ *_ideal_state_total_metrics);
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes));
@@ -106,7 +116,10 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
*_stripe_accessor);
_stripes.emplace_back(std::move(_stripe));
for (size_t i = 1; i < num_distributor_stripes; ++i) {
- _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, _total_metrics->stripe(i), node_identity, threadPool,
+ _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg,
+ _total_metrics->stripe(i),
+ _ideal_state_total_metrics->stripe(i),
+ node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode, i));
}
_stripe_scan_stats.resize(num_distributor_stripes);
@@ -565,6 +578,7 @@ Distributor::propagateInternalScanMetricsToExternal()
stripe->propagateInternalScanMetricsToExternal();
}
_total_metrics->aggregate();
+ _ideal_state_total_metrics->aggregate();
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 756346277c9..831750a1c89 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -9,6 +9,7 @@
#include "distributor_interface.h"
#include "distributor_stripe_interface.h"
#include "externaloperationhandler.h"
+#include "ideal_state_total_metrics.h"
#include "idealstatemanager.h"
#include "min_replica_provider.h"
#include "pendingmessagetracker.h"
@@ -201,6 +202,8 @@ private:
const bool _use_legacy_mode;
std::shared_ptr<DistributorMetricSet> _metrics;
std::shared_ptr<DistributorTotalMetrics> _total_metrics;
+ std::shared_ptr<IdealStateMetricSet> _ideal_state_metrics;
+ std::shared_ptr<IdealStateTotalMetrics> _ideal_state_total_metrics;
ChainedMessageSender* _messageSender;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
uint8_t _n_stripe_bits;
@@ -213,6 +216,7 @@ private:
MessageQueue _message_queue; // Queue for top-level ops
MessageQueue _fetched_messages;
distributor::DistributorComponent _component;
+ storage::DistributorComponent _ideal_state_component;
std::shared_ptr<const DistributorConfiguration> _total_config;
std::unique_ptr<BucketDBUpdater> _bucket_db_updater;
StatusReporterDelegate _distributorStatusDelegate;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index c94c1a415b0..837193a1e7c 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -36,6 +36,7 @@ namespace storage::distributor {
*/
DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
DistributorMetricSet& metrics,
+ IdealStateMetricSet& ideal_state_metrics,
const NodeIdentity& node_identity,
framework::TickingThreadPool& threadPool,
DoneInitializeHandler& doneInitHandler,
@@ -58,7 +59,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_bucketDBUpdater(_component, _component, *this, *this, use_legacy_mode),
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
- _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, stripe_index),
+ _idealStateManager(_component, _component, ideal_state_metrics),
_messageSender(messageSender),
_stripe_host_info_notifier(stripe_host_info_notifier),
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 347863b6d77..e9dcb3e65fc 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -59,6 +59,7 @@ class DistributorStripe final
public:
DistributorStripe(DistributorComponentRegister&,
DistributorMetricSet& metrics,
+ IdealStateMetricSet& ideal_state_metrics,
const NodeIdentity& node_identity,
framework::TickingThreadPool&,
DoneInitializeHandler&,
diff --git a/storage/src/vespa/storage/distributor/ideal_state_total_metrics.cpp b/storage/src/vespa/storage/distributor/ideal_state_total_metrics.cpp
new file mode 100644
index 00000000000..65dcad468fc
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/ideal_state_total_metrics.cpp
@@ -0,0 +1,51 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "ideal_state_total_metrics.h"
+
+namespace storage::distributor {
+
+void
+IdealStateTotalMetrics::aggregate_helper(IdealStateMetricSet& total) const
+{
+ for (auto& stripe_metrics : _stripes_metrics) {
+ stripe_metrics->addToPart(total);
+ }
+}
+
+IdealStateTotalMetrics::IdealStateTotalMetrics(uint32_t num_distributor_stripes)
+ : IdealStateMetricSet(),
+ _stripes_metrics()
+{
+ _stripes_metrics.reserve(num_distributor_stripes);
+ for (uint32_t i = 0; i < num_distributor_stripes; ++i) {
+ _stripes_metrics.emplace_back(std::make_shared<IdealStateMetricSet>());
+ }
+}
+
+IdealStateTotalMetrics::~IdealStateTotalMetrics() = default;
+
+void
+IdealStateTotalMetrics::aggregate()
+{
+ IdealStateMetricSet::reset();
+ aggregate_helper(*this);
+}
+
+void
+IdealStateTotalMetrics::addToSnapshot(Metric& m, std::vector<Metric::UP>& owner_list) const
+{
+ IdealStateMetricSet total;
+ aggregate_helper(total);
+ total.addToSnapshot(m, owner_list);
+}
+
+void
+IdealStateTotalMetrics::reset()
+{
+ IdealStateMetricSet::reset();
+ for (auto& stripe_metrics : _stripes_metrics) {
+ stripe_metrics->reset();
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/ideal_state_total_metrics.h b/storage/src/vespa/storage/distributor/ideal_state_total_metrics.h
new file mode 100644
index 00000000000..c3207baa2f0
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/ideal_state_total_metrics.h
@@ -0,0 +1,28 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "idealstatemetricsset.h"
+
+namespace storage::distributor {
+
+/*
+ * Class presenting total metrics (as an IdealStateMetricSet) to the metric framework,
+ * while managing an IdealStateMetricSet for each distributor stripe.
+ */
+class IdealStateTotalMetrics : public IdealStateMetricSet {
+private:
+ std::vector<std::shared_ptr<IdealStateMetricSet>> _stripes_metrics;
+
+ void aggregate_helper(IdealStateMetricSet& total) const;
+
+public:
+ explicit IdealStateTotalMetrics(uint32_t num_distributor_stripes);
+ ~IdealStateTotalMetrics() override;
+ void aggregate();
+ void addToSnapshot(Metric& m, std::vector<Metric::UP>& owner_list) const override;
+ void reset() override;
+ IdealStateMetricSet& stripe(uint32_t stripe_index) { return *_stripes_metrics[stripe_index]; }
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 013551b8505..65e018765fe 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -25,21 +25,14 @@ namespace storage {
namespace distributor {
IdealStateManager::IdealStateManager(
- DistributorStripeInterface& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- DistributorComponentRegister& compReg,
- uint32_t stripe_index)
- : _metrics(new IdealStateMetricSet),
- _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Ideal state manager"),
- _bucketSpaceRepo(bucketSpaceRepo),
+ const DistributorNodeContext& node_ctx,
+ DistributorStripeOperationContext& op_ctx,
+ IdealStateMetricSet& metrics)
+ : _metrics(metrics),
+ _node_ctx(node_ctx),
+ _op_ctx(op_ctx),
_has_logged_phantom_replica_warning(false)
{
- if (stripe_index == 0) {
- // TODO STRIPE: Add proper handling of metrics across distributor stripes
- _distributorComponent.registerMetric(*_metrics);
- }
-
LOG(debug, "Adding BucketStateStateChecker to state checkers");
_stateCheckers.push_back(StateChecker::SP(new BucketStateStateChecker()));
@@ -167,7 +160,7 @@ IdealStateManager::generateHighestPriority(
const document::Bucket &bucket,
NodeMaintenanceStatsTracker& statsTracker) const
{
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ auto& distributorBucketSpace = _op_ctx.bucket_space_repo().get(bucket.getBucketSpace());
StateChecker::Context c(node_context(), operation_context(), distributorBucketSpace, statsTracker, bucket);
fillParentAndChildBuckets(c);
fillSiblingBucket(c);
@@ -204,7 +197,7 @@ IdealStateManager::generateInterceptingSplit(BucketSpace bucketSpace,
{
NodeMaintenanceStatsTracker statsTracker;
document::Bucket bucket(bucketSpace, e.getBucketId());
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ auto& distributorBucketSpace = _op_ctx.bucket_space_repo().get(bucket.getBucketSpace());
StateChecker::Context c(node_context(), operation_context(), distributorBucketSpace, statsTracker, bucket);
if (e.valid()) {
c.entry = e;
@@ -239,7 +232,7 @@ std::vector<MaintenanceOperation::SP>
IdealStateManager::generateAll(const document::Bucket &bucket,
NodeMaintenanceStatsTracker& statsTracker) const
{
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ auto& distributorBucketSpace = _op_ctx.bucket_space_repo().get(bucket.getBucketSpace());
StateChecker::Context c(node_context(), operation_context(), distributorBucketSpace, statsTracker, bucket);
fillParentAndChildBuckets(c);
fillSiblingBucket(c);
@@ -291,7 +284,7 @@ IdealStateManager::getBucketStatus(
void IdealStateManager::dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const {
StatusBucketVisitor proc(*this, bucket_space, out);
- auto &distributorBucketSpace(_bucketSpaceRepo.get(bucket_space));
+ auto& distributorBucketSpace = _op_ctx.bucket_space_repo().get(bucket_space);
distributorBucketSpace.getBucketDatabase().forEach(proc);
}
@@ -299,7 +292,7 @@ void IdealStateManager::getBucketStatus(std::ostream& out) const {
LOG(debug, "Dumping bucket database valid at cluster state version %u",
operation_context().cluster_state_bundle().getVersion());
- for (auto& space : _bucketSpaceRepo) {
+ for (auto& space : _op_ctx.bucket_space_repo()) {
out << "<h2>" << document::FixedBucketSpaces::to_string(space.first) << " - " << space.first << "</h2>\n";
dump_bucket_space_db_status(space.first, out);
}
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index 041e009ee9f..c0fa7dd70ab 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -33,11 +33,9 @@ class IdealStateManager : public MaintenancePriorityGenerator,
{
public:
- IdealStateManager(DistributorStripeInterface& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- DistributorComponentRegister& compReg,
- uint32_t stripe_index = 0);
+ IdealStateManager(const DistributorNodeContext& node_ctx,
+ DistributorStripeOperationContext& op_ctx,
+ IdealStateMetricSet& metrics);
~IdealStateManager() override;
@@ -66,18 +64,18 @@ public:
const BucketDatabase::Entry& e,
api::StorageMessage::Priority pri);
- IdealStateMetricSet& getMetrics() { return *_metrics; }
+ IdealStateMetricSet& getMetrics() { return _metrics; }
void dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const;
void getBucketStatus(std::ostream& out) const;
- const DistributorNodeContext& node_context() const { return _distributorComponent; }
- DistributorStripeOperationContext& operation_context() { return _distributorComponent; }
- const DistributorStripeOperationContext& operation_context() const { return _distributorComponent; }
- DistributorBucketSpaceRepo &getBucketSpaceRepo() { return _bucketSpaceRepo; }
- const DistributorBucketSpaceRepo &getBucketSpaceRepo() const { return _bucketSpaceRepo; }
+ const DistributorNodeContext& node_context() const { return _node_ctx; }
+ DistributorStripeOperationContext& operation_context() { return _op_ctx; }
+ const DistributorStripeOperationContext& operation_context() const { return _op_ctx; }
+ DistributorBucketSpaceRepo &getBucketSpaceRepo() { return _op_ctx.bucket_space_repo(); }
+ const DistributorBucketSpaceRepo &getBucketSpaceRepo() const { return _op_ctx.bucket_space_repo(); }
private:
void verify_only_live_nodes_in_context(const StateChecker::Context& c) const;
@@ -90,7 +88,7 @@ private:
BucketDatabase::Entry* getEntryForPrimaryBucket(StateChecker::Context& c) const;
- std::shared_ptr<IdealStateMetricSet> _metrics;
+ IdealStateMetricSet& _metrics;
document::BucketId _lastPrioritizedBucket;
// Prioritized of state checkers that generate operations
@@ -98,8 +96,8 @@ private:
std::vector<StateChecker::SP> _stateCheckers;
SplitBucketStateChecker* _splitBucketStateChecker;
- DistributorStripeComponent _distributorComponent;
- DistributorBucketSpaceRepo& _bucketSpaceRepo;
+ const DistributorNodeContext& _node_ctx;
+ DistributorStripeOperationContext& _op_ctx;
mutable bool _has_logged_phantom_replica_warning;
bool iAmUp() const;