diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-09-17 15:33:46 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-09-17 15:33:46 +0200 |
commit | 4b35b944335c0e6a73d7258f573fce2286e80647 (patch) | |
tree | 5edbf1734bcef3becabcf2d5ad1297c273a369ea /searchcore | |
parent | bf17a89d72bd3e83e89a40cbf80a5641593ffff7 (diff) |
Add BmNodeStats, containing info about documents on nodes, buckets
and bucktets with pending ideal state operations.
Diffstat (limited to 'searchcore')
14 files changed, 513 insertions, 2 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 42becd26195..7f9ae442dd3 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -12,6 +12,8 @@ #include <vespa/searchcore/bmcluster/bm_feeder.h> #include <vespa/searchcore/bmcluster/bm_feed_params.h> #include <vespa/searchcore/bmcluster/bm_node.h> +#include <vespa/searchcore/bmcluster/bm_node_stats.h> +#include <vespa/searchcore/bmcluster/bm_node_stats_reporter.h> #include <vespa/searchcore/bmcluster/bm_range.h> #include <vespa/searchcore/bmcluster/bucket_selector.h> #include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h> @@ -45,6 +47,7 @@ using search::bmcluster::BmFeed; using search::bmcluster::BmFeedParams; using search::bmcluster::BmFeeder; using search::bmcluster::BmNode; +using search::bmcluster::BmNodeStatsReporter; using search::bmcluster::BmRange; using search::bmcluster::BucketSelector; using search::index::DummyFileHeaderContext; @@ -160,12 +163,19 @@ void benchmark(const BMParams &bm_params) auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update"); auto get_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_get_feed(range, bucket_selector); }, f._feed.num_buckets(), "get"); auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove"); - int64_t time_bias = 1; + BmNodeStatsReporter reporter(cluster); + reporter.start(500ms); + int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count(); LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str()); benchmark_feed(feeder, time_bias, put_feed, bm_params, bm_params.get_put_passes(), "put"); + reporter.report_now(); benchmark_feed(feeder, time_bias, update_feed, bm_params, bm_params.get_update_passes(), "update"); + reporter.report_now(); benchmark_feed(feeder, time_bias, get_feed, bm_params, bm_params.get_get_passes(), "get"); + reporter.report_now(); benchmark_feed(feeder, time_bias, remove_feed, bm_params, bm_params.get_remove_passes(), "remove"); + reporter.report_now(); + reporter.stop(); LOG(info, "--------------------------------"); cluster.stop(); diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index 184ada0ab38..d2ab8b0c2a9 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -1,16 +1,20 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchcore_bmcluster STATIC SOURCES + bm_buckets_stats.cpp bm_cluster.cpp bm_cluster_controller.cpp bm_cluster_params.cpp bm_distribution.cpp + bm_document_db_stats.cpp bm_feed.cpp bm_feeder.cpp bm_feed_params.cpp bm_message_bus.cpp bm_message_bus_routes.cpp bm_node.cpp + bm_node_stats.cpp + bm_node_stats_reporter.cpp bm_storage_chain_builder.cpp bm_storage_link.cpp bm_storage_message_addresses.cpp diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp new file mode 100644 index 00000000000..f4a3a8f2461 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp @@ -0,0 +1,38 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_buckets_stats.h" + +namespace search::bmcluster { + +BmBucketsStats::BmBucketsStats() + : BmBucketsStats(0u, 0u, false) +{ +} + +BmBucketsStats::BmBucketsStats(uint64_t buckets, uint64_t buckets_pending, bool valid) + : _buckets(buckets), + _buckets_pending(buckets_pending), + _valid(valid) +{ +} + +BmBucketsStats::~BmBucketsStats() = default; + +BmBucketsStats& +BmBucketsStats::operator+=(const BmBucketsStats& rhs) +{ + _valid &= rhs._valid; + _buckets += rhs._buckets; + _buckets_pending += rhs._buckets_pending; + return *this; +} + +bool +BmBucketsStats::operator==(const BmBucketsStats &rhs) const +{ + return ((_buckets == rhs._buckets) && + (_buckets_pending == rhs._buckets_pending) && + (_valid == rhs._valid)); +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h new file mode 100644 index 00000000000..ca018ff3b50 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> + +namespace search::bmcluster { + +/* + * Class containing bucket stats + */ +class BmBucketsStats +{ + uint64_t _buckets; + uint64_t _buckets_pending; // Buckets with pending ideal state operations + bool _valid; + +public: + BmBucketsStats(); + BmBucketsStats(uint64_t buckets, uint64_t buckets_pending, bool valid); + ~BmBucketsStats(); + BmBucketsStats& operator+=(const BmBucketsStats& rhs); + bool operator==(const BmBucketsStats &rhs) const; + uint64_t get_buckets() const noexcept { return _buckets; } + uint64_t get_buckets_pending() const noexcept { return _buckets_pending; } + bool get_valid() const noexcept { return _valid; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp index adb7f2dd12b..36e1e394f40 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp @@ -6,6 +6,7 @@ #include "bm_feed.h" #include "bm_message_bus.h" #include "bm_node.h" +#include "bm_node_stats.h" #include "document_api_message_bus_bm_feed_handler.h" #include "spi_bm_feed_handler.h" #include "storage_api_chain_bm_feed_handler.h" @@ -424,4 +425,17 @@ BmCluster::get_feed_handler() return _feed_handler.get(); } +std::vector<BmNodeStats> +BmCluster::get_node_stats() +{ + std::vector<BmNodeStats> node_stats(_nodes.size()); + storage::lib::ClusterState baseline_state(*_distribution->get_cluster_state_bundle().getBaselineClusterState()); + for (const auto &node : _nodes) { + if (node) { + node->merge_node_stats(node_stats, baseline_state); + } + } + return node_stats; +} + } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h index e8f72ec8254..8615b44bd7e 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h @@ -29,6 +29,7 @@ class BmClusterController; class BmFeed; class BmMessageBus; class BmNode; +class BmNodeStats; class IBmDistribution; class IBmFeedHandler; @@ -88,6 +89,7 @@ public: IBmFeedHandler* get_feed_handler(); uint32_t get_num_nodes() const { return _nodes.size(); } BmNode *get_node(uint32_t node_idx) const { return node_idx < _nodes.size() ? _nodes[node_idx].get() : nullptr; } + std::vector<BmNodeStats> get_node_stats(); }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp new file mode 100644 index 00000000000..93624353026 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp @@ -0,0 +1,42 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_document_db_stats.h" + +namespace search::bmcluster { + +BmDocumentDbStats::BmDocumentDbStats() + : BmDocumentDbStats(0u, 0u, 0u, 0u) +{ +} + +BmDocumentDbStats::BmDocumentDbStats(uint64_t active_docs, uint64_t stored_docs, uint64_t total_docs, uint64_t removed_docs) + : _active_docs(active_docs), + _stored_docs(stored_docs), + _total_docs(total_docs), + _removed_docs(removed_docs) +{ +} + + +BmDocumentDbStats::~BmDocumentDbStats() = default; + +BmDocumentDbStats& +BmDocumentDbStats::operator+=(const BmDocumentDbStats& rhs) +{ + _active_docs += rhs._active_docs; + _stored_docs += rhs._stored_docs; + _total_docs += rhs._total_docs; + _removed_docs += rhs._removed_docs; + return *this; +} + +bool +BmDocumentDbStats::operator==(const BmDocumentDbStats &rhs) const +{ + return ((_active_docs == rhs._active_docs) && + (_stored_docs == rhs._stored_docs) && + (_total_docs == rhs._total_docs) && + (_removed_docs == rhs._removed_docs)); +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h new file mode 100644 index 00000000000..33559fcc26f --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> + +namespace search::bmcluster { + +/* + * Class containing stats for a document db + */ +class BmDocumentDbStats +{ + uint64_t _active_docs; + uint64_t _stored_docs; + uint64_t _total_docs; + uint64_t _removed_docs; + +public: + BmDocumentDbStats(); + BmDocumentDbStats(uint64_t active_docs, uint64_t stored_docs, uint64_t total_docs, uint64_t removed_docs); + ~BmDocumentDbStats(); + BmDocumentDbStats& operator+=(const BmDocumentDbStats& rhs); + bool operator==(const BmDocumentDbStats &rhs) const; + uint64_t get_active_docs() const noexcept { return _active_docs; } + uint64_t get_stored_docs() const noexcept { return _stored_docs; } + uint64_t get_total_docs() const noexcept { return _total_docs; } + uint64_t get_removed_docs() const noexcept { return _removed_docs; } +}; + + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 7a297e82655..7ed83ed669a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -4,6 +4,7 @@ #include "bm_cluster.h" #include "bm_cluster_params.h" #include "bm_message_bus.h" +#include "bm_node_stats.h" #include "bm_storage_chain_builder.h" #include "bm_storage_link_context.h" #include "storage_api_chain_bm_feed_handler.h" @@ -44,6 +45,7 @@ #include <vespa/searchcore/proton/server/bootstrapconfig.h> #include <vespa/searchcore/proton/server/documentdb.h> #include <vespa/searchcore/proton/server/document_db_maintenance_config.h> +#include <vespa/searchcore/proton/server/document_meta_store_read_guards.h> #include <vespa/searchcore/proton/server/documentdbconfigmanager.h> #include <vespa/searchcore/proton/server/fileconfigmanager.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> @@ -54,6 +56,7 @@ #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/storage/bucketdb/config-stor-bucket-init.h> #include <vespa/storage/common/i_storage_chain_builder.h> +#include <vespa/storage/common/storagelink.h> #include <vespa/storage/config/config-stor-bouncer.h> #include <vespa/storage/config/config-stor-communicationmanager.h> #include <vespa/storage/config/config-stor-distributormanager.h> @@ -62,10 +65,12 @@ #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/config/config-stor-status.h> #include <vespa/storage/config/config-stor-visitordispatcher.h> +#include <vespa/storage/distributor/bucket_spaces_stats_provider.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/visiting/config-stor-visitor.h> #include <vespa/storageserver/app/distributorprocess.h> #include <vespa/storageserver/app/servicelayerprocess.h> +#include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> @@ -93,6 +98,7 @@ using proton::HwInfo; using search::index::Schema; using search::index::SchemaBuilder; using search::transactionlog::TransLogServer; +using storage::distributor::BucketSpacesStatsProvider; using storage::rpc::SharedRpcResources; using storage::rpc::StorageApiRpcService; using storage::spi::PersistenceProvider; @@ -148,6 +154,20 @@ int port_number(int base_port, PortBias bias) storage::spi::Context context(storage::spi::Priority(0), 0); +BucketSpacesStatsProvider* extract_bucket_spaces_stats_provider(storage::DistributorProcess& distributor) +{ + auto& node = distributor.getNode(); + auto *link = node.getChain(); + while (link != nullptr) { + link = link->getNextLink(); + auto provider = dynamic_cast<BucketSpacesStatsProvider*>(link); + if (provider != nullptr) { + return provider; + } + } + return nullptr; +} + } std::shared_ptr<AttributesConfig> make_attributes_config() { @@ -441,6 +461,8 @@ class MyBmNode : public BmNode std::unique_ptr<MyServiceLayerProcess> _service_layer; std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; std::unique_ptr<storage::DistributorProcess> _distributor; + BucketSpacesStatsProvider* _bucket_spaces_stats_provider; + std::mutex _lock; void create_document_db(const BmClusterParams& params); public: @@ -458,6 +480,7 @@ public: std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) override; bool has_storage_layer(bool distributor) const override; PersistenceProvider* get_persistence_provider() override; + void merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::ClusterState &baseline_state) override; }; MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port) @@ -500,7 +523,9 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _service_layer_chain_context(), _service_layer(), _distributor_chain_context(), - _distributor() + _distributor(), + _bucket_spaces_stats_provider(nullptr), + _lock() { _persistence_engine = std::make_unique<proton::PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false); create_document_db(params); @@ -608,6 +633,9 @@ MyBmNode::start_distributor(const BmClusterParams& params) } _distributor->setupConfig(100ms); _distributor->createNode(); + auto bucket_spaces_stats_provider = extract_bucket_spaces_stats_provider(*_distributor); + std::lock_guard<std::mutex> guard(_lock); + _bucket_spaces_stats_provider = bucket_spaces_stats_provider; } void @@ -615,6 +643,10 @@ MyBmNode::shutdown_distributor() { if (_distributor) { LOG(info, "stop distributor"); + { + std::lock_guard guard(_lock); + _bucket_spaces_stats_provider = nullptr; + } _distributor->getNode().requestShutdown("controlled shutdown"); _distributor->shutdown(); } @@ -674,6 +706,56 @@ BmNode::num_ports() return static_cast<unsigned int>(PortBias::NUM_PORTS); } +void +MyBmNode::merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::ClusterState &baseline_state) +{ + auto& storage_node_state = baseline_state.getNodeState(storage::lib::Node(storage::lib::NodeType::STORAGE, _node_idx)); + if (storage_node_state.getState().oneOf("uir")) { + // TODO: Check cluster state and ignore nodes that are down. + if (_document_db) { + proton::DocumentMetaStoreReadGuards dmss(_document_db->getDocumentSubDBs()); + uint32_t active_docs = dmss.numActiveDocs(); + uint32_t ready_docs = dmss.numReadyDocs(); + uint32_t total_docs = dmss.numTotalDocs(); + uint32_t removed_docs = dmss.numRemovedDocs(); + + if (_node_idx < node_stats.size()) { + node_stats[_node_idx].set_document_db_stats(BmDocumentDbStats(active_docs, ready_docs, total_docs, removed_docs)); + } + } + } + auto& distributor_node_state = baseline_state.getNodeState(storage::lib::Node(storage::lib::NodeType::DISTRIBUTOR, _node_idx)); + if (distributor_node_state.getState().oneOf("u")) { + std::optional<BucketSpacesStatsProvider::PerNodeBucketSpacesStats> per_node_bucket_spaces_stats; + { + std::lock_guard<std::mutex> guard(_lock); + if (_bucket_spaces_stats_provider) { + per_node_bucket_spaces_stats = _bucket_spaces_stats_provider->getBucketSpacesStats(); + } + } + if (per_node_bucket_spaces_stats.has_value()) { + for (auto &node_idx_and_stats : per_node_bucket_spaces_stats.value()) { + uint32_t node_idx = node_idx_and_stats.first; + if (node_idx < node_stats.size()) { + auto& stats = node_idx_and_stats.second; + for (auto &bucket_space_and_stat : stats) { + auto& stat = bucket_space_and_stat.second; + uint32_t buckets = stat.bucketsTotal(); + uint32_t buckets_pending = stat.bucketsPending(); + bool buckets_valid = stat.valid(); + node_stats[node_idx].merge_bucket_stats(BmBucketsStats(buckets, buckets_pending, buckets_valid)); + } + } + } + } else { + // Incomplete bucket stats + for (uint32_t node_idx = 0; node_idx < node_stats.size(); ++node_idx) { + node_stats[node_idx].merge_bucket_stats(BmBucketsStats()); + } + } + } +} + std::unique_ptr<BmNode> BmNode::create(const vespalib::string& base_dir, int base_port, uint32_t node_idx, BmCluster &cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port) { diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h index 31eb6fc15ff..49c80db44ce 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h @@ -4,6 +4,7 @@ #include <memory> #include <vespa/searchcore/proton/common/doctypename.h> +#include <vector> namespace document { @@ -16,12 +17,14 @@ class Field; namespace document::internal { class InternalDocumenttypesType; } +namespace storage::lib { class ClusterState; } namespace storage::spi { struct PersistenceProvider; } namespace search::bmcluster { class BmCluster; class BmClusterParams; +class BmNodeStats; struct BmStorageLinkContext; class IBmFeedHandler; class IBMDistribution; @@ -47,6 +50,7 @@ public: virtual std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) = 0; virtual bool has_storage_layer(bool distributor) const = 0; virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0; + virtual void merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::ClusterState &baseline_state) = 0; static unsigned int num_ports(); static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port); }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp new file mode 100644 index 00000000000..c9330dec59e --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp @@ -0,0 +1,69 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_node_stats.h" +#include <cassert> + +namespace search::bmcluster { + +namespace { + +template <class Stats> +void merge(std::optional<Stats> &lhs, const Stats &rhs) +{ + if (lhs) { + auto value = lhs.value(); + value += rhs; + lhs = value; + } else { + lhs = rhs; + } +} + +template <class Stats> +void merge(std::optional<Stats> &lhs, const std::optional<Stats> &rhs) +{ + if (rhs) { + merge(lhs, rhs.value()); + } +} + +} + +BmNodeStats::BmNodeStats() + : _document_db(), + _buckets() +{ +} + + +BmNodeStats::~BmNodeStats() = default; + +BmNodeStats& +BmNodeStats::operator+=(const BmNodeStats& rhs) +{ + merge(_document_db, rhs._document_db); + merge(_buckets, rhs._buckets); + return *this; +} + +bool +BmNodeStats::operator==(const BmNodeStats &rhs) const +{ + return ((_document_db == rhs._document_db) && + (_buckets == rhs._buckets)); +} + +void +BmNodeStats::set_document_db_stats(const BmDocumentDbStats &document_db) +{ + assert(!_document_db); + _document_db = document_db; +} + +void +BmNodeStats::merge_bucket_stats(const BmBucketsStats &buckets) +{ + merge(_buckets, buckets); +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h new file mode 100644 index 00000000000..06baae3a9ef --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bm_document_db_stats.h" +#include "bm_buckets_stats.h" +#include <optional> + +namespace search::bmcluster { + +/* + * Class containing stats for a node + */ +class BmNodeStats +{ + std::optional<BmDocumentDbStats> _document_db; + std::optional<BmBucketsStats> _buckets; +public: + BmNodeStats(); + ~BmNodeStats(); + BmNodeStats& operator+=(const BmNodeStats& rhs); + bool operator==(const BmNodeStats &rhs) const; + void set_document_db_stats(const BmDocumentDbStats &document_db); + void merge_bucket_stats(const BmBucketsStats &buckets); + const std::optional<BmDocumentDbStats>& get_document_db_stats() const noexcept { return _document_db; } + const std::optional<BmBucketsStats>& get_buckets_stats() const noexcept { return _buckets; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp new file mode 100644 index 00000000000..4d464923efa --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp @@ -0,0 +1,123 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_node_stats_reporter.h" +#include "bm_cluster.h" +#include "bm_node_stats.h" +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP(".bmcluster.bm_node_stats_reporter"); + + +using vespalib::makeLambdaTask; + +namespace search::bmcluster { + +BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster) + : _cluster(cluster), + _executor(1, 128_Ki), + _mutex(), + _cond(), + _pending_report(1u), + _started(false), + _stop(false) +{ +} + +BmNodeStatsReporter::~BmNodeStatsReporter() +{ + if (!_stop) { + stop(); + } +} + +void +BmNodeStatsReporter::start(std::chrono::milliseconds interval) +{ + if (!_started) { + _started = true; + _executor.execute(makeLambdaTask([this, interval]() { run_report_loop(interval); })); + std::unique_lock<std::mutex> guard(_mutex); + _cond.wait(guard, [this]() { return _pending_report == 0u; }); + } +} + +void +BmNodeStatsReporter::stop() +{ + { + std::lock_guard guard(_mutex); + _stop = true; + _cond.notify_all(); + } + _executor.sync(); + _executor.shutdown(); +} + +void +BmNodeStatsReporter::report_now() +{ + std::unique_lock<std::mutex> guard(_mutex); + ++_pending_report; + _cond.notify_all(); + _cond.wait(guard, [this]() { return _pending_report == 0u || _stop; }); +} + +void +BmNodeStatsReporter::report() +{ + using Width = vespalib::asciistream::Width; + auto node_stats = _cluster.get_node_stats(); + vespalib::asciistream s; + s << "nodes stats "; + BmNodeStats totals; + for (auto &node : node_stats) { + auto& document_db = node.get_document_db_stats(); + if (document_db.has_value()) { + s << Width(8) << document_db.value().get_total_docs(); + } else { + s << Width(8) << "-"; + } + totals += node; + } + if (totals.get_document_db_stats().has_value()) { + s << Width(8) << totals.get_document_db_stats().value().get_total_docs(); + } else { + s << Width(8) << "-"; + } + auto& total_buckets = totals.get_buckets_stats(); + if (total_buckets.has_value()) { + s << Width(8) << total_buckets.value().get_buckets_pending(); + if (!total_buckets.value().get_valid()) { + s << "?"; + } + } else { + s << Width(8) << "-"; + } + vespalib::string ss(s.str()); + LOG(info, "%s", ss.c_str()); +} + +void +BmNodeStatsReporter::run_report_loop(std::chrono::milliseconds interval) +{ + std::unique_lock<std::mutex> guard(_mutex); + while (!_stop) { + uint32_t pending_handled = _pending_report; + guard.unlock(); + report(); + guard.lock(); + if (pending_handled != 0u) { + _pending_report -= pending_handled; + _cond.notify_all(); + } + if (!_stop && _pending_report == 0u) { + _cond.wait_for(guard, interval); + } + } +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h new file mode 100644 index 00000000000..be0e10e37a4 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h @@ -0,0 +1,33 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <chrono> +#include <mutex> +#include <condition_variable> + +namespace search::bmcluster { + +class BmCluster; + +class BmNodeStatsReporter { + BmCluster& _cluster; + vespalib::ThreadStackExecutor _executor; + std::mutex _mutex; + std::condition_variable _cond; + uint32_t _pending_report; + bool _started; + bool _stop; + + void report(); + void run_report_loop(std::chrono::milliseconds interval); +public: + BmNodeStatsReporter(BmCluster& cluster); + ~BmNodeStatsReporter(); + void start(std::chrono::milliseconds interval); + void stop(); + void report_now(); +}; + +} |