diff options
author | Arne Juul <arnej@verizonmedia.com> | 2021-06-11 14:48:47 +0000 |
---|---|---|
committer | Arne Juul <arnej@verizonmedia.com> | 2021-06-11 14:48:47 +0000 |
commit | 339b58f9437e0c60cc8107aa0252a783d2790e27 (patch) | |
tree | 21c440ad72ab1be1487c19e14ed20be554532a84 /storage/src | |
parent | db8530327e2b5348239925013366665c012381d9 (diff) | |
parent | f163dd2b355819e490fd8f2e0327a3a6950bb94a (diff) |
Merge branch 'master' into arnej/add-report-connectivity-rpc
Conflicts:
configd/src/apps/sentinel/connectivity.cpp
Diffstat (limited to 'storage/src')
16 files changed, 408 insertions, 26 deletions
diff --git a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp index e1010285dba..934ecc7456b 100644 --- a/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp +++ b/storage/src/tests/distributor/distributor_host_info_reporter_test.cpp @@ -11,10 +11,12 @@ namespace storage::distributor { -using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; using End = vespalib::JsonStream::End; using File = vespalib::File; +using MinReplicaStats = std::unordered_map<uint16_t, uint32_t>; using Object = vespalib::JsonStream::Object; +using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; +using BucketSpacesStats = BucketSpacesStatsProvider::BucketSpacesStats; using namespace ::testing; struct DistributorHostInfoReporterTest : Test { @@ -35,7 +37,7 @@ namespace { // My kingdom for GoogleMock! struct MockedMinReplicaProvider : MinReplicaProvider { - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; std::unordered_map<uint16_t, uint32_t> getMinReplica() const override { return minReplica; @@ -121,7 +123,7 @@ struct Fixture { TEST_F(DistributorHostInfoReporterTest, min_replica_stats_are_reported) { Fixture f; - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; minReplica[0] = 2; minReplica[5] = 9; f.minReplicaProvider.minReplica = minReplica; @@ -133,10 +135,30 @@ TEST_F(DistributorHostInfoReporterTest, min_replica_stats_are_reported) { EXPECT_EQ(9, getMinReplica(root, 5)); } +TEST_F(DistributorHostInfoReporterTest, merge_min_replica_stats) { + + MinReplicaStats min_replica_a; + min_replica_a[3] = 2; + min_replica_a[5] = 4; + + MinReplicaStats min_replica_b; + min_replica_b[5] = 6; + min_replica_b[7] = 8; + + MinReplicaStats result; + merge_min_replica_stats(result, min_replica_a); + merge_min_replica_stats(result, min_replica_b); + + EXPECT_EQ(3, result.size()); + EXPECT_EQ(2, result[3]); + EXPECT_EQ(4, result[5]); + EXPECT_EQ(8, result[7]); +} + TEST_F(DistributorHostInfoReporterTest, generate_example_json) { Fixture f; - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; minReplica[0] = 2; minReplica[5] = 9; f.minReplicaProvider.minReplica = minReplica; @@ -175,7 +197,7 @@ TEST_F(DistributorHostInfoReporterTest, no_report_generated_if_disabled) { Fixture f; f.reporter.enableReporting(false); - std::unordered_map<uint16_t, uint32_t> minReplica; + MinReplicaStats minReplica; minReplica[0] = 2; minReplica[5] = 9; f.minReplicaProvider.minReplica = minReplica; @@ -210,5 +232,41 @@ TEST_F(DistributorHostInfoReporterTest, bucket_spaces_stats_are_reported) { } } +TEST_F(DistributorHostInfoReporterTest, merge_per_node_bucket_spaces_stats) { + + PerNodeBucketSpacesStats stats_a; + stats_a[3]["default"] = BucketSpaceStats(3, 2); + stats_a[3]["global"] = BucketSpaceStats(5, 4); + stats_a[5]["default"] = BucketSpaceStats(7, 6); + stats_a[5]["global"] = BucketSpaceStats(9, 8); + + PerNodeBucketSpacesStats stats_b; + stats_b[5]["default"] = BucketSpaceStats(11, 10); + stats_b[5]["global"] = BucketSpaceStats(13, 12); + stats_b[7]["default"] = BucketSpaceStats(15, 14); + + PerNodeBucketSpacesStats result; + merge_per_node_bucket_spaces_stats(result, stats_a); + merge_per_node_bucket_spaces_stats(result, stats_b); + + PerNodeBucketSpacesStats exp; + exp[3]["default"] = BucketSpaceStats(3, 2); + exp[3]["global"] = BucketSpaceStats(5, 4); + exp[5]["default"] = BucketSpaceStats(7+11, 6+10); + exp[5]["global"] = BucketSpaceStats(9+13, 8+12); + exp[7]["default"] = BucketSpaceStats(15, 14); + + EXPECT_EQ(exp, result); } +TEST_F(DistributorHostInfoReporterTest, merge_bucket_space_stats_maintains_valid_flag) { + BucketSpaceStats stats_a(5, 3); + BucketSpaceStats stats_b; + + stats_a.merge(stats_b); + EXPECT_FALSE(stats_a.valid()); + EXPECT_EQ(5, stats_a.bucketsTotal()); + EXPECT_EQ(3, stats_a.bucketsPending()); +} + +} diff --git a/storage/src/tests/distributor/simplemaintenancescannertest.cpp b/storage/src/tests/distributor/simplemaintenancescannertest.cpp index 58dc2430041..1bf3809b135 100644 --- a/storage/src/tests/distributor/simplemaintenancescannertest.cpp +++ b/storage/src/tests/distributor/simplemaintenancescannertest.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/distributor/maintenancemocks.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_bucket_space_repo.h> @@ -209,4 +210,88 @@ TEST_F(SimpleMaintenanceScannerTest, per_node_maintenance_stats_are_tracked) { } } +TEST_F(SimpleMaintenanceScannerTest, merge_node_maintenance_stats) { + + NodeMaintenanceStats stats_a; + stats_a.movingOut = 1; + stats_a.syncing = 2; + stats_a.copyingIn = 3; + stats_a.copyingOut = 4; + stats_a.total = 5; + + NodeMaintenanceStats stats_b; + stats_b.movingOut = 10; + stats_b.syncing = 20; + stats_b.copyingIn = 30; + stats_b.copyingOut = 40; + stats_b.total = 50; + + NodeMaintenanceStats result; + result.merge(stats_a); + result.merge(stats_b); + + NodeMaintenanceStats exp; + exp.movingOut = 11; + exp.syncing = 22; + exp.copyingIn = 33; + exp.copyingOut = 44; + exp.total = 55; + EXPECT_EQ(exp, result); +} + +TEST_F(SimpleMaintenanceScannerTest, merge_pending_maintenance_stats) { + auto default_space = document::FixedBucketSpaces::default_space(); + auto global_space = document::FixedBucketSpaces::global_space(); + + PendingStats stats_a; + stats_a.global.pending[MaintenanceOperation::DELETE_BUCKET] = 1; + stats_a.global.pending[MaintenanceOperation::MERGE_BUCKET] = 2; + stats_a.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 3; + stats_a.global.pending[MaintenanceOperation::JOIN_BUCKET] = 4; + stats_a.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 5; + stats_a.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 6; + stats_a.perNodeStats.incMovingOut(3, default_space); + stats_a.perNodeStats.incSyncing(3, global_space); + stats_a.perNodeStats.incCopyingIn(5, default_space); + stats_a.perNodeStats.incCopyingOut(5, global_space); + stats_a.perNodeStats.incTotal(5, default_space); + + PendingStats stats_b; + stats_b.global.pending[MaintenanceOperation::DELETE_BUCKET] = 10; + stats_b.global.pending[MaintenanceOperation::MERGE_BUCKET] = 20; + stats_b.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 30; + stats_b.global.pending[MaintenanceOperation::JOIN_BUCKET] = 40; + stats_b.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 50; + stats_b.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 60; + stats_b.perNodeStats.incMovingOut(7, default_space); + stats_b.perNodeStats.incSyncing(7, global_space); + stats_b.perNodeStats.incCopyingIn(5, default_space); + stats_b.perNodeStats.incCopyingOut(5, global_space); + stats_b.perNodeStats.incTotal(5, default_space); + + PendingStats result; + result.merge(stats_a); + result.merge(stats_b); + + PendingStats exp; + exp.global.pending[MaintenanceOperation::DELETE_BUCKET] = 11; + exp.global.pending[MaintenanceOperation::MERGE_BUCKET] = 22; + exp.global.pending[MaintenanceOperation::SPLIT_BUCKET] = 33; + exp.global.pending[MaintenanceOperation::JOIN_BUCKET] = 44; + exp.global.pending[MaintenanceOperation::SET_BUCKET_STATE] = 55; + exp.global.pending[MaintenanceOperation::GARBAGE_COLLECTION] = 66; + exp.perNodeStats.incMovingOut(3, default_space); + exp.perNodeStats.incSyncing(3, global_space); + exp.perNodeStats.incCopyingIn(5, default_space); + exp.perNodeStats.incCopyingIn(5, default_space); + exp.perNodeStats.incCopyingOut(5, global_space); + exp.perNodeStats.incCopyingOut(5, global_space); + exp.perNodeStats.incTotal(5, default_space); + exp.perNodeStats.incTotal(5, default_space); + exp.perNodeStats.incMovingOut(7, default_space); + exp.perNodeStats.incSyncing(7, global_space); + EXPECT_EQ(exp.global, result.global); + EXPECT_EQ(exp.perNodeStats, result.perNodeStats); +} + } diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 7b048e9f109..231361d72d6 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(storage_distributor bucket_space_distribution_configs.cpp bucket_space_distribution_context.cpp bucket_space_state_map.cpp + bucket_spaces_stats_provider.cpp bucketdbupdater.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp @@ -22,6 +23,7 @@ vespa_add_library(storage_distributor distributor_stripe_component.cpp distributor_stripe_pool.cpp distributor_stripe_thread.cpp + distributor_total_metrics.cpp distributormessagesender.cpp distributormetricsset.cpp externaloperationhandler.cpp @@ -29,6 +31,7 @@ vespa_add_library(storage_distributor idealstatemanager.cpp idealstatemetricsset.cpp messagetracker.cpp + min_replica_provider.cpp multi_threaded_stripe_access_guard.cpp nodeinfo.cpp operation_routing_snapshot.cpp diff --git a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp new file mode 100644 index 00000000000..2b12d437aaa --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.cpp @@ -0,0 +1,40 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucket_spaces_stats_provider.h" + +namespace storage::distributor { + +std::ostream& +operator<<(std::ostream& out, const BucketSpaceStats& stats) +{ + out << "{valid=" << stats.valid() << ", bucketsTotal=" << stats.bucketsTotal() << ", bucketsPending=" << stats.bucketsPending() << "}"; + return out; +} + +void +merge_bucket_spaces_stats(BucketSpacesStatsProvider::BucketSpacesStats& dest, + const BucketSpacesStatsProvider::BucketSpacesStats& src) +{ + for (const auto& entry : src) { + const auto& bucket_space_name = entry.first; + auto itr = dest.find(bucket_space_name); + if (itr != dest.end()) { + itr->second.merge(entry.second); + } else { + // We need to explicitly handle this case to avoid creating an empty BucketSpaceStats that is not valid. + dest[bucket_space_name] = entry.second; + } + } +} + +void +merge_per_node_bucket_spaces_stats(BucketSpacesStatsProvider::PerNodeBucketSpacesStats& dest, + const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& src) +{ + for (const auto& entry : src) { + auto node_index = entry.first; + merge_bucket_spaces_stats(dest[node_index], entry.second); + } +} + +} diff --git a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h index 3d7b60f4471..c8ba04ed1ab 100644 --- a/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h +++ b/storage/src/vespa/storage/distributor/bucket_spaces_stats_provider.h @@ -3,6 +3,7 @@ #include <vespa/vespalib/stllike/string.h> #include <map> +#include <ostream> #include <unordered_map> namespace storage::distributor { @@ -32,8 +33,22 @@ public: bool valid() const noexcept { return _valid; } size_t bucketsTotal() const noexcept { return _bucketsTotal; } size_t bucketsPending() const noexcept { return _bucketsPending; } + + bool operator==(const BucketSpaceStats& rhs) const { + return (_valid == rhs._valid) && + (_bucketsTotal == rhs._bucketsTotal) && + (_bucketsPending == rhs._bucketsPending); + } + + void merge(const BucketSpaceStats& rhs) { + _valid = _valid && rhs._valid; + _bucketsTotal += rhs._bucketsTotal; + _bucketsPending += rhs._bucketsPending; + } }; +std::ostream& operator<<(std::ostream& out, const BucketSpaceStats& stats); + /** * Interface that provides snapshots of bucket spaces statistics per content node. */ @@ -48,4 +63,10 @@ public: virtual PerNodeBucketSpacesStats getBucketSpacesStats() const = 0; }; +void merge_bucket_spaces_stats(BucketSpacesStatsProvider::BucketSpacesStats& dest, + const BucketSpacesStatsProvider::BucketSpacesStats& src); + +void merge_per_node_bucket_spaces_stats(BucketSpacesStatsProvider::PerNodeBucketSpacesStats& dest, + const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& src); + } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 7ac15b3929c..be9ad1179fb 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -9,7 +9,7 @@ #include "distributor_stripe.h" #include "distributor_stripe_pool.h" #include "distributor_stripe_thread.h" -#include "distributormetricsset.h" +#include "distributor_total_metrics.h" #include "idealstatemetricsset.h" #include "multi_threaded_stripe_access_guard.h" #include "operation_sequencer.h" @@ -59,11 +59,12 @@ Distributor::Distributor(DistributorComponentRegister& compReg, : StorageLink("distributor"), framework::StatusReporter("distributor", "Distributor"), _comp_reg(compReg), + _use_legacy_mode(num_distributor_stripes == 0), _metrics(std::make_shared<DistributorMetricSet>()), + _total_metrics(_use_legacy_mode ? std::shared_ptr<DistributorTotalMetrics>() : std::make_shared<DistributorTotalMetrics>(num_distributor_stripes)), _messageSender(messageSender), - _use_legacy_mode(num_distributor_stripes == 0), _n_stripe_bits(0), - _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, + _stripe(std::make_unique<DistributorStripe>(compReg, _use_legacy_mode ? *_metrics : _total_metrics->stripe(0), node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode)), _stripe_pool(stripe_pool), _stripes(), @@ -91,7 +92,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _next_distribution(), _current_internal_config_generation(_component.internal_config_generation()) { - _component.registerMetric(*_metrics); + _component.registerMetric(_use_legacy_mode ? *_metrics : *_total_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes)); @@ -105,7 +106,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, *_stripe_accessor); _stripes.emplace_back(std::move(_stripe)); for (size_t i = 1; i < num_distributor_stripes; ++i) { - _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, + _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, _total_metrics->stripe(i), node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode, i)); } _stripe_scan_stats.resize(num_distributor_stripes); @@ -124,6 +125,12 @@ Distributor::~Distributor() closeNextLink(); } +DistributorMetricSet& +Distributor::getMetrics() +{ + return _use_legacy_mode ? *_metrics : _total_metrics->bucket_db_updater_metrics(); +} + // TODO STRIPE remove DistributorStripe& Distributor::first_stripe() noexcept { @@ -322,6 +329,7 @@ namespace { bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& msg) noexcept { switch (msg.getType().getId()) { case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::GETNODESTATE_ID: case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: return true; case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: @@ -521,44 +529,54 @@ Distributor::propagateDefaultDistribution( std::unordered_map<uint16_t, uint32_t> Distributor::getMinReplica() const { - // TODO STRIPE merged snapshot from all stripes if (_use_legacy_mode) { return _stripe->getMinReplica(); } else { - return first_stripe().getMinReplica(); + std::unordered_map<uint16_t, uint32_t> result; + for (const auto& stripe : _stripes) { + merge_min_replica_stats(result, stripe->getMinReplica()); + } + return result; } } BucketSpacesStatsProvider::PerNodeBucketSpacesStats Distributor::getBucketSpacesStats() const { - // TODO STRIPE merged snapshot from all stripes if (_use_legacy_mode) { return _stripe->getBucketSpacesStats(); } else { - return first_stripe().getBucketSpacesStats(); + BucketSpacesStatsProvider::PerNodeBucketSpacesStats result; + for (const auto& stripe : _stripes) { + merge_per_node_bucket_spaces_stats(result, stripe->getBucketSpacesStats()); + } + return result; } } SimpleMaintenanceScanner::PendingMaintenanceStats Distributor::pending_maintenance_stats() const { - // TODO STRIPE merged snapshot from all stripes if (_use_legacy_mode) { return _stripe->pending_maintenance_stats(); } else { - return first_stripe().pending_maintenance_stats(); + SimpleMaintenanceScanner::PendingMaintenanceStats result; + for (const auto& stripe : _stripes) { + result.merge(stripe->pending_maintenance_stats()); + } + return result; } } void Distributor::propagateInternalScanMetricsToExternal() { - // TODO STRIPE propagate to all stripes - // TODO STRIPE reconsider metric wiring... if (_use_legacy_mode) { _stripe->propagateInternalScanMetricsToExternal(); } else { - first_stripe().propagateInternalScanMetricsToExternal(); + for (auto &stripe : _stripes) { + stripe->propagateInternalScanMetricsToExternal(); + } + _total_metrics->aggregate(); } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 6f9654b78b8..48ad061859f 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -44,6 +44,7 @@ class DistributorBucketSpaceRepo; class DistributorStatus; class DistributorStripe; class DistributorStripePool; +class DistributorTotalMetrics; class StripeAccessor; class OperationSequencer; class OwnershipTransferSafeTimePointCalculator; @@ -78,7 +79,7 @@ public: void sendUp(const std::shared_ptr<api::StorageMessage>&) override; void sendDown(const std::shared_ptr<api::StorageMessage>&) override; - DistributorMetricSet& getMetrics() { return *_metrics; } + DistributorMetricSet& getMetrics(); // Implements DistributorInterface and DistributorMessageSender. DistributorMetricSet& metrics() override { return getMetrics(); } @@ -201,9 +202,10 @@ private: using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; DistributorComponentRegister& _comp_reg; + const bool _use_legacy_mode; std::shared_ptr<DistributorMetricSet> _metrics; + std::shared_ptr<DistributorTotalMetrics> _total_metrics; ChainedMessageSender* _messageSender; - const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. uint8_t _n_stripe_bits; std::unique_ptr<DistributorStripe> _stripe; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 4f6e2d5016b..c94c1a415b0 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -748,7 +748,7 @@ void DistributorStripe::send_updated_host_info_if_required() { if (_use_legacy_mode) { _component.getStateUpdater().immediately_send_get_node_state_replies(); } else { - _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index! + _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index); } _must_send_updated_host_info = false; } diff --git a/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp b/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp new file mode 100644 index 00000000000..543712cc4d2 --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_total_metrics.cpp @@ -0,0 +1,40 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "distributor_total_metrics.h" + +namespace storage::distributor { + +DistributorTotalMetrics::DistributorTotalMetrics(uint32_t num_distributor_stripes) + : DistributorMetricSet(), + _stripes_metrics(), + _bucket_db_updater_metrics() +{ + _stripes_metrics.reserve(num_distributor_stripes); + for (uint32_t i = 0; i < num_distributor_stripes; ++i) { + _stripes_metrics.emplace_back(std::make_shared<DistributorMetricSet>()); + } +} + +DistributorTotalMetrics::~DistributorTotalMetrics() = default; + +void +DistributorTotalMetrics::aggregate() +{ + DistributorMetricSet::reset(); + _bucket_db_updater_metrics.addToPart(*this); + for (auto &stripe_metrics : _stripes_metrics) { + stripe_metrics->addToPart(*this); + } +} + +void +DistributorTotalMetrics::reset() +{ + DistributorMetricSet::reset(); + _bucket_db_updater_metrics.reset(); + for (auto &stripe_metrics : _stripes_metrics) { + stripe_metrics->reset(); + } +} + +} diff --git a/storage/src/vespa/storage/distributor/distributor_total_metrics.h b/storage/src/vespa/storage/distributor/distributor_total_metrics.h new file mode 100644 index 00000000000..14116af3d3b --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_total_metrics.h @@ -0,0 +1,27 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "distributormetricsset.h" + +namespace storage::distributor { + +/* + * Class presenting total metrics (as a DistributorMetricSet) to the + * metric framework, while managing a DistributorMetricSet for each + * stripe and an extra one for the top level bucket db updater. + */ +class DistributorTotalMetrics : public DistributorMetricSet +{ + std::vector<std::shared_ptr<DistributorMetricSet>> _stripes_metrics; + DistributorMetricSet _bucket_db_updater_metrics; +public: + explicit DistributorTotalMetrics(uint32_t num_distributor_stripes); + ~DistributorTotalMetrics() override; + void aggregate(); + void reset() override; + DistributorMetricSet& stripe(uint32_t stripe_index) { return *_stripes_metrics[stripe_index]; } + DistributorMetricSet& bucket_db_updater_metrics() { return _bucket_db_updater_metrics; } +}; + +} diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp index b954ef93c76..4e7f7d9d89d 100644 --- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.cpp @@ -7,6 +7,39 @@ namespace storage::distributor { const NodeMaintenanceStats NodeMaintenanceStatsTracker::_emptyNodeMaintenanceStats; +void +NodeMaintenanceStats::merge(const NodeMaintenanceStats& rhs) +{ + movingOut += rhs.movingOut; + syncing += rhs.syncing; + copyingIn += rhs.copyingIn; + copyingOut += rhs.copyingOut; + total += rhs.total; +} + +namespace { + +void +merge_bucket_spaces_stats(NodeMaintenanceStatsTracker::BucketSpacesStats& dest, + const NodeMaintenanceStatsTracker::BucketSpacesStats& src) +{ + for (const auto& entry : src) { + auto bucket_space = entry.first; + dest[bucket_space].merge(entry.second); + } +} + +} + +void +NodeMaintenanceStatsTracker::merge(const NodeMaintenanceStatsTracker& rhs) +{ + for (const auto& entry : rhs._stats) { + auto node_index = entry.first; + merge_bucket_spaces_stats(_stats[node_index], entry.second); + } +} + std::ostream& operator<<(std::ostream& os, const NodeMaintenanceStats& stats) { diff --git a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h index faf253fc84c..6399e53089b 100644 --- a/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h +++ b/storage/src/vespa/storage/distributor/maintenance/node_maintenance_stats_tracker.h @@ -37,6 +37,8 @@ struct NodeMaintenanceStats bool operator!=(const NodeMaintenanceStats& other) const noexcept { return !(*this == other); } + + void merge(const NodeMaintenanceStats& rhs); }; std::ostream& operator<<(std::ostream&, const NodeMaintenanceStats&); @@ -93,6 +95,11 @@ public: const PerNodeStats& perNodeStats() const { return _stats; } + + bool operator==(const NodeMaintenanceStatsTracker& rhs) const { + return _stats == rhs._stats; + } + void merge(const NodeMaintenanceStatsTracker& rhs); }; } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp index 15a57c1e7ee..2bfce9569cc 100644 --- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp +++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp @@ -19,6 +19,28 @@ SimpleMaintenanceScanner::SimpleMaintenanceScanner(BucketPriorityDatabase& bucke SimpleMaintenanceScanner::~SimpleMaintenanceScanner() = default; +bool +SimpleMaintenanceScanner::GlobalMaintenanceStats::operator==(const GlobalMaintenanceStats& rhs) const +{ + return pending == rhs.pending; +} + +void +SimpleMaintenanceScanner::GlobalMaintenanceStats::merge(const GlobalMaintenanceStats& rhs) +{ + assert(pending.size() == rhs.pending.size()); + for (size_t i = 0; i < pending.size(); ++i) { + pending[i] += rhs.pending[i]; + } +} + +void +SimpleMaintenanceScanner::PendingMaintenanceStats::merge(const PendingMaintenanceStats& rhs) +{ + global.merge(rhs.global); + perNodeStats.merge(rhs.perNodeStats); +} + SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() = default; SimpleMaintenanceScanner::PendingMaintenanceStats::~PendingMaintenanceStats() = default; SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(const PendingMaintenanceStats &) = default; diff --git a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h index 254b3244171..69e63fd4c65 100644 --- a/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h +++ b/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.h @@ -18,6 +18,9 @@ public: GlobalMaintenanceStats() : pending(MaintenanceOperation::OPERATION_COUNT) { } + + bool operator==(const GlobalMaintenanceStats& rhs) const; + void merge(const GlobalMaintenanceStats& rhs); }; struct PendingMaintenanceStats { PendingMaintenanceStats(); @@ -26,6 +29,8 @@ public: ~PendingMaintenanceStats(); GlobalMaintenanceStats global; NodeMaintenanceStatsTracker perNodeStats; + + void merge(const PendingMaintenanceStats& rhs); }; private: BucketPriorityDatabase& _bucketPriorityDb; diff --git a/storage/src/vespa/storage/distributor/min_replica_provider.cpp b/storage/src/vespa/storage/distributor/min_replica_provider.cpp new file mode 100644 index 00000000000..c9929940560 --- /dev/null +++ b/storage/src/vespa/storage/distributor/min_replica_provider.cpp @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "min_replica_provider.h" + +namespace storage::distributor { + +void +merge_min_replica_stats(std::unordered_map<uint16_t, uint32_t>& dest, + const std::unordered_map<uint16_t, uint32_t>& src) +{ + for (const auto& entry : src) { + auto node_index = entry.first; + auto itr = dest.find(node_index); + auto new_min_replica = (itr != dest.end()) ? std::min(itr->second, entry.second) : entry.second; + dest[node_index] = new_min_replica; + } +} + +} diff --git a/storage/src/vespa/storage/distributor/min_replica_provider.h b/storage/src/vespa/storage/distributor/min_replica_provider.h index 6d644f4e9d4..ba946cd5a7f 100644 --- a/storage/src/vespa/storage/distributor/min_replica_provider.h +++ b/storage/src/vespa/storage/distributor/min_replica_provider.h @@ -4,8 +4,7 @@ #include <stdint.h> #include <unordered_map> -namespace storage { -namespace distributor { +namespace storage::distributor { class MinReplicaProvider { @@ -21,5 +20,8 @@ public: virtual std::unordered_map<uint16_t, uint32_t> getMinReplica() const = 0; }; -} // distributor -} // storage +void merge_min_replica_stats(std::unordered_map<uint16_t, uint32_t>& dest, + const std::unordered_map<uint16_t, uint32_t>& src); + +} + |