summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-17 15:33:46 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-17 15:33:46 +0200
commit4b35b944335c0e6a73d7258f573fce2286e80647 (patch)
tree5edbf1734bcef3becabcf2d5ad1297c273a369ea /searchcore
parentbf17a89d72bd3e83e89a40cbf80a5641593ffff7 (diff)
Add BmNodeStats, containing info about documents on nodes, buckets
and bucktets with pending ideal state operations.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp38
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h29
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp42
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h32
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp84
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.h4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp69
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h29
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp123
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h33
14 files changed, 513 insertions, 2 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 42becd26195..7f9ae442dd3 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -12,6 +12,8 @@
#include <vespa/searchcore/bmcluster/bm_feeder.h>
#include <vespa/searchcore/bmcluster/bm_feed_params.h>
#include <vespa/searchcore/bmcluster/bm_node.h>
+#include <vespa/searchcore/bmcluster/bm_node_stats.h>
+#include <vespa/searchcore/bmcluster/bm_node_stats_reporter.h>
#include <vespa/searchcore/bmcluster/bm_range.h>
#include <vespa/searchcore/bmcluster/bucket_selector.h>
#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h>
@@ -45,6 +47,7 @@ using search::bmcluster::BmFeed;
using search::bmcluster::BmFeedParams;
using search::bmcluster::BmFeeder;
using search::bmcluster::BmNode;
+using search::bmcluster::BmNodeStatsReporter;
using search::bmcluster::BmRange;
using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
@@ -160,12 +163,19 @@ void benchmark(const BMParams &bm_params)
auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
auto get_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_get_feed(range, bucket_selector); }, f._feed.num_buckets(), "get");
auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
- int64_t time_bias = 1;
+ BmNodeStatsReporter reporter(cluster);
+ reporter.start(500ms);
+ int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
benchmark_feed(feeder, time_bias, put_feed, bm_params, bm_params.get_put_passes(), "put");
+ reporter.report_now();
benchmark_feed(feeder, time_bias, update_feed, bm_params, bm_params.get_update_passes(), "update");
+ reporter.report_now();
benchmark_feed(feeder, time_bias, get_feed, bm_params, bm_params.get_get_passes(), "get");
+ reporter.report_now();
benchmark_feed(feeder, time_bias, remove_feed, bm_params, bm_params.get_remove_passes(), "remove");
+ reporter.report_now();
+ reporter.stop();
LOG(info, "--------------------------------");
cluster.stop();
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
index 184ada0ab38..d2ab8b0c2a9 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -1,16 +1,20 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(searchcore_bmcluster STATIC
SOURCES
+ bm_buckets_stats.cpp
bm_cluster.cpp
bm_cluster_controller.cpp
bm_cluster_params.cpp
bm_distribution.cpp
+ bm_document_db_stats.cpp
bm_feed.cpp
bm_feeder.cpp
bm_feed_params.cpp
bm_message_bus.cpp
bm_message_bus_routes.cpp
bm_node.cpp
+ bm_node_stats.cpp
+ bm_node_stats_reporter.cpp
bm_storage_chain_builder.cpp
bm_storage_link.cpp
bm_storage_message_addresses.cpp
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp
new file mode 100644
index 00000000000..f4a3a8f2461
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.cpp
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_buckets_stats.h"
+
+namespace search::bmcluster {
+
+BmBucketsStats::BmBucketsStats()
+ : BmBucketsStats(0u, 0u, false)
+{
+}
+
+BmBucketsStats::BmBucketsStats(uint64_t buckets, uint64_t buckets_pending, bool valid)
+ : _buckets(buckets),
+ _buckets_pending(buckets_pending),
+ _valid(valid)
+{
+}
+
+BmBucketsStats::~BmBucketsStats() = default;
+
+BmBucketsStats&
+BmBucketsStats::operator+=(const BmBucketsStats& rhs)
+{
+ _valid &= rhs._valid;
+ _buckets += rhs._buckets;
+ _buckets_pending += rhs._buckets_pending;
+ return *this;
+}
+
+bool
+BmBucketsStats::operator==(const BmBucketsStats &rhs) const
+{
+ return ((_buckets == rhs._buckets) &&
+ (_buckets_pending == rhs._buckets_pending) &&
+ (_valid == rhs._valid));
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h
new file mode 100644
index 00000000000..ca018ff3b50
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_buckets_stats.h
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+
+namespace search::bmcluster {
+
+/*
+ * Class containing bucket stats
+ */
+class BmBucketsStats
+{
+ uint64_t _buckets;
+ uint64_t _buckets_pending; // Buckets with pending ideal state operations
+ bool _valid;
+
+public:
+ BmBucketsStats();
+ BmBucketsStats(uint64_t buckets, uint64_t buckets_pending, bool valid);
+ ~BmBucketsStats();
+ BmBucketsStats& operator+=(const BmBucketsStats& rhs);
+ bool operator==(const BmBucketsStats &rhs) const;
+ uint64_t get_buckets() const noexcept { return _buckets; }
+ uint64_t get_buckets_pending() const noexcept { return _buckets_pending; }
+ bool get_valid() const noexcept { return _valid; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
index adb7f2dd12b..36e1e394f40 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
@@ -6,6 +6,7 @@
#include "bm_feed.h"
#include "bm_message_bus.h"
#include "bm_node.h"
+#include "bm_node_stats.h"
#include "document_api_message_bus_bm_feed_handler.h"
#include "spi_bm_feed_handler.h"
#include "storage_api_chain_bm_feed_handler.h"
@@ -424,4 +425,17 @@ BmCluster::get_feed_handler()
return _feed_handler.get();
}
+std::vector<BmNodeStats>
+BmCluster::get_node_stats()
+{
+ std::vector<BmNodeStats> node_stats(_nodes.size());
+ storage::lib::ClusterState baseline_state(*_distribution->get_cluster_state_bundle().getBaselineClusterState());
+ for (const auto &node : _nodes) {
+ if (node) {
+ node->merge_node_stats(node_stats, baseline_state);
+ }
+ }
+ return node_stats;
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
index e8f72ec8254..8615b44bd7e 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
@@ -29,6 +29,7 @@ class BmClusterController;
class BmFeed;
class BmMessageBus;
class BmNode;
+class BmNodeStats;
class IBmDistribution;
class IBmFeedHandler;
@@ -88,6 +89,7 @@ public:
IBmFeedHandler* get_feed_handler();
uint32_t get_num_nodes() const { return _nodes.size(); }
BmNode *get_node(uint32_t node_idx) const { return node_idx < _nodes.size() ? _nodes[node_idx].get() : nullptr; }
+ std::vector<BmNodeStats> get_node_stats();
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp
new file mode 100644
index 00000000000..93624353026
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.cpp
@@ -0,0 +1,42 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_document_db_stats.h"
+
+namespace search::bmcluster {
+
+BmDocumentDbStats::BmDocumentDbStats()
+ : BmDocumentDbStats(0u, 0u, 0u, 0u)
+{
+}
+
+BmDocumentDbStats::BmDocumentDbStats(uint64_t active_docs, uint64_t stored_docs, uint64_t total_docs, uint64_t removed_docs)
+ : _active_docs(active_docs),
+ _stored_docs(stored_docs),
+ _total_docs(total_docs),
+ _removed_docs(removed_docs)
+{
+}
+
+
+BmDocumentDbStats::~BmDocumentDbStats() = default;
+
+BmDocumentDbStats&
+BmDocumentDbStats::operator+=(const BmDocumentDbStats& rhs)
+{
+ _active_docs += rhs._active_docs;
+ _stored_docs += rhs._stored_docs;
+ _total_docs += rhs._total_docs;
+ _removed_docs += rhs._removed_docs;
+ return *this;
+}
+
+bool
+BmDocumentDbStats::operator==(const BmDocumentDbStats &rhs) const
+{
+ return ((_active_docs == rhs._active_docs) &&
+ (_stored_docs == rhs._stored_docs) &&
+ (_total_docs == rhs._total_docs) &&
+ (_removed_docs == rhs._removed_docs));
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h
new file mode 100644
index 00000000000..33559fcc26f
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_document_db_stats.h
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+
+namespace search::bmcluster {
+
+/*
+ * Class containing stats for a document db
+ */
+class BmDocumentDbStats
+{
+ uint64_t _active_docs;
+ uint64_t _stored_docs;
+ uint64_t _total_docs;
+ uint64_t _removed_docs;
+
+public:
+ BmDocumentDbStats();
+ BmDocumentDbStats(uint64_t active_docs, uint64_t stored_docs, uint64_t total_docs, uint64_t removed_docs);
+ ~BmDocumentDbStats();
+ BmDocumentDbStats& operator+=(const BmDocumentDbStats& rhs);
+ bool operator==(const BmDocumentDbStats &rhs) const;
+ uint64_t get_active_docs() const noexcept { return _active_docs; }
+ uint64_t get_stored_docs() const noexcept { return _stored_docs; }
+ uint64_t get_total_docs() const noexcept { return _total_docs; }
+ uint64_t get_removed_docs() const noexcept { return _removed_docs; }
+};
+
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 7a297e82655..7ed83ed669a 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -4,6 +4,7 @@
#include "bm_cluster.h"
#include "bm_cluster_params.h"
#include "bm_message_bus.h"
+#include "bm_node_stats.h"
#include "bm_storage_chain_builder.h"
#include "bm_storage_link_context.h"
#include "storage_api_chain_bm_feed_handler.h"
@@ -44,6 +45,7 @@
#include <vespa/searchcore/proton/server/bootstrapconfig.h>
#include <vespa/searchcore/proton/server/documentdb.h>
#include <vespa/searchcore/proton/server/document_db_maintenance_config.h>
+#include <vespa/searchcore/proton/server/document_meta_store_read_guards.h>
#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
#include <vespa/searchcore/proton/server/fileconfigmanager.h>
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
@@ -54,6 +56,7 @@
#include <vespa/searchsummary/config/config-juniperrc.h>
#include <vespa/storage/bucketdb/config-stor-bucket-init.h>
#include <vespa/storage/common/i_storage_chain_builder.h>
+#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/config/config-stor-bouncer.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
#include <vespa/storage/config/config-stor-distributormanager.h>
@@ -62,10 +65,12 @@
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/config/config-stor-status.h>
#include <vespa/storage/config/config-stor-visitordispatcher.h>
+#include <vespa/storage/distributor/bucket_spaces_stats_provider.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
#include <vespa/storageserver/app/distributorprocess.h>
#include <vespa/storageserver/app/servicelayerprocess.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
@@ -93,6 +98,7 @@ using proton::HwInfo;
using search::index::Schema;
using search::index::SchemaBuilder;
using search::transactionlog::TransLogServer;
+using storage::distributor::BucketSpacesStatsProvider;
using storage::rpc::SharedRpcResources;
using storage::rpc::StorageApiRpcService;
using storage::spi::PersistenceProvider;
@@ -148,6 +154,20 @@ int port_number(int base_port, PortBias bias)
storage::spi::Context context(storage::spi::Priority(0), 0);
+BucketSpacesStatsProvider* extract_bucket_spaces_stats_provider(storage::DistributorProcess& distributor)
+{
+ auto& node = distributor.getNode();
+ auto *link = node.getChain();
+ while (link != nullptr) {
+ link = link->getNextLink();
+ auto provider = dynamic_cast<BucketSpacesStatsProvider*>(link);
+ if (provider != nullptr) {
+ return provider;
+ }
+ }
+ return nullptr;
+}
+
}
std::shared_ptr<AttributesConfig> make_attributes_config() {
@@ -441,6 +461,8 @@ class MyBmNode : public BmNode
std::unique_ptr<MyServiceLayerProcess> _service_layer;
std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
std::unique_ptr<storage::DistributorProcess> _distributor;
+ BucketSpacesStatsProvider* _bucket_spaces_stats_provider;
+ std::mutex _lock;
void create_document_db(const BmClusterParams& params);
public:
@@ -458,6 +480,7 @@ public:
std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) override;
bool has_storage_layer(bool distributor) const override;
PersistenceProvider* get_persistence_provider() override;
+ void merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::ClusterState &baseline_state) override;
};
MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port)
@@ -500,7 +523,9 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod
_service_layer_chain_context(),
_service_layer(),
_distributor_chain_context(),
- _distributor()
+ _distributor(),
+ _bucket_spaces_stats_provider(nullptr),
+ _lock()
{
_persistence_engine = std::make_unique<proton::PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false);
create_document_db(params);
@@ -608,6 +633,9 @@ MyBmNode::start_distributor(const BmClusterParams& params)
}
_distributor->setupConfig(100ms);
_distributor->createNode();
+ auto bucket_spaces_stats_provider = extract_bucket_spaces_stats_provider(*_distributor);
+ std::lock_guard<std::mutex> guard(_lock);
+ _bucket_spaces_stats_provider = bucket_spaces_stats_provider;
}
void
@@ -615,6 +643,10 @@ MyBmNode::shutdown_distributor()
{
if (_distributor) {
LOG(info, "stop distributor");
+ {
+ std::lock_guard guard(_lock);
+ _bucket_spaces_stats_provider = nullptr;
+ }
_distributor->getNode().requestShutdown("controlled shutdown");
_distributor->shutdown();
}
@@ -674,6 +706,56 @@ BmNode::num_ports()
return static_cast<unsigned int>(PortBias::NUM_PORTS);
}
+void
+MyBmNode::merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::ClusterState &baseline_state)
+{
+ auto& storage_node_state = baseline_state.getNodeState(storage::lib::Node(storage::lib::NodeType::STORAGE, _node_idx));
+ if (storage_node_state.getState().oneOf("uir")) {
+ // TODO: Check cluster state and ignore nodes that are down.
+ if (_document_db) {
+ proton::DocumentMetaStoreReadGuards dmss(_document_db->getDocumentSubDBs());
+ uint32_t active_docs = dmss.numActiveDocs();
+ uint32_t ready_docs = dmss.numReadyDocs();
+ uint32_t total_docs = dmss.numTotalDocs();
+ uint32_t removed_docs = dmss.numRemovedDocs();
+
+ if (_node_idx < node_stats.size()) {
+ node_stats[_node_idx].set_document_db_stats(BmDocumentDbStats(active_docs, ready_docs, total_docs, removed_docs));
+ }
+ }
+ }
+ auto& distributor_node_state = baseline_state.getNodeState(storage::lib::Node(storage::lib::NodeType::DISTRIBUTOR, _node_idx));
+ if (distributor_node_state.getState().oneOf("u")) {
+ std::optional<BucketSpacesStatsProvider::PerNodeBucketSpacesStats> per_node_bucket_spaces_stats;
+ {
+ std::lock_guard<std::mutex> guard(_lock);
+ if (_bucket_spaces_stats_provider) {
+ per_node_bucket_spaces_stats = _bucket_spaces_stats_provider->getBucketSpacesStats();
+ }
+ }
+ if (per_node_bucket_spaces_stats.has_value()) {
+ for (auto &node_idx_and_stats : per_node_bucket_spaces_stats.value()) {
+ uint32_t node_idx = node_idx_and_stats.first;
+ if (node_idx < node_stats.size()) {
+ auto& stats = node_idx_and_stats.second;
+ for (auto &bucket_space_and_stat : stats) {
+ auto& stat = bucket_space_and_stat.second;
+ uint32_t buckets = stat.bucketsTotal();
+ uint32_t buckets_pending = stat.bucketsPending();
+ bool buckets_valid = stat.valid();
+ node_stats[node_idx].merge_bucket_stats(BmBucketsStats(buckets, buckets_pending, buckets_valid));
+ }
+ }
+ }
+ } else {
+ // Incomplete bucket stats
+ for (uint32_t node_idx = 0; node_idx < node_stats.size(); ++node_idx) {
+ node_stats[node_idx].merge_bucket_stats(BmBucketsStats());
+ }
+ }
+ }
+}
+
std::unique_ptr<BmNode>
BmNode::create(const vespalib::string& base_dir, int base_port, uint32_t node_idx, BmCluster &cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port)
{
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
index 31eb6fc15ff..49c80db44ce 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
@@ -4,6 +4,7 @@
#include <memory>
#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vector>
namespace document {
@@ -16,12 +17,14 @@ class Field;
namespace document::internal { class InternalDocumenttypesType; }
+namespace storage::lib { class ClusterState; }
namespace storage::spi { struct PersistenceProvider; }
namespace search::bmcluster {
class BmCluster;
class BmClusterParams;
+class BmNodeStats;
struct BmStorageLinkContext;
class IBmFeedHandler;
class IBMDistribution;
@@ -47,6 +50,7 @@ public:
virtual std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) = 0;
virtual bool has_storage_layer(bool distributor) const = 0;
virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0;
+ virtual void merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::ClusterState &baseline_state) = 0;
static unsigned int num_ports();
static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port);
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp
new file mode 100644
index 00000000000..c9330dec59e
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_node_stats.h"
+#include <cassert>
+
+namespace search::bmcluster {
+
+namespace {
+
+template <class Stats>
+void merge(std::optional<Stats> &lhs, const Stats &rhs)
+{
+ if (lhs) {
+ auto value = lhs.value();
+ value += rhs;
+ lhs = value;
+ } else {
+ lhs = rhs;
+ }
+}
+
+template <class Stats>
+void merge(std::optional<Stats> &lhs, const std::optional<Stats> &rhs)
+{
+ if (rhs) {
+ merge(lhs, rhs.value());
+ }
+}
+
+}
+
+BmNodeStats::BmNodeStats()
+ : _document_db(),
+ _buckets()
+{
+}
+
+
+BmNodeStats::~BmNodeStats() = default;
+
+BmNodeStats&
+BmNodeStats::operator+=(const BmNodeStats& rhs)
+{
+ merge(_document_db, rhs._document_db);
+ merge(_buckets, rhs._buckets);
+ return *this;
+}
+
+bool
+BmNodeStats::operator==(const BmNodeStats &rhs) const
+{
+ return ((_document_db == rhs._document_db) &&
+ (_buckets == rhs._buckets));
+}
+
+void
+BmNodeStats::set_document_db_stats(const BmDocumentDbStats &document_db)
+{
+ assert(!_document_db);
+ _document_db = document_db;
+}
+
+void
+BmNodeStats::merge_bucket_stats(const BmBucketsStats &buckets)
+{
+ merge(_buckets, buckets);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h
new file mode 100644
index 00000000000..06baae3a9ef
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bm_document_db_stats.h"
+#include "bm_buckets_stats.h"
+#include <optional>
+
+namespace search::bmcluster {
+
+/*
+ * Class containing stats for a node
+ */
+class BmNodeStats
+{
+ std::optional<BmDocumentDbStats> _document_db;
+ std::optional<BmBucketsStats> _buckets;
+public:
+ BmNodeStats();
+ ~BmNodeStats();
+ BmNodeStats& operator+=(const BmNodeStats& rhs);
+ bool operator==(const BmNodeStats &rhs) const;
+ void set_document_db_stats(const BmDocumentDbStats &document_db);
+ void merge_bucket_stats(const BmBucketsStats &buckets);
+ const std::optional<BmDocumentDbStats>& get_document_db_stats() const noexcept { return _document_db; }
+ const std::optional<BmBucketsStats>& get_buckets_stats() const noexcept { return _buckets; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
new file mode 100644
index 00000000000..4d464923efa
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
@@ -0,0 +1,123 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_node_stats_reporter.h"
+#include "bm_cluster.h"
+#include "bm_node_stats.h"
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bmcluster.bm_node_stats_reporter");
+
+
+using vespalib::makeLambdaTask;
+
+namespace search::bmcluster {
+
+BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster)
+ : _cluster(cluster),
+ _executor(1, 128_Ki),
+ _mutex(),
+ _cond(),
+ _pending_report(1u),
+ _started(false),
+ _stop(false)
+{
+}
+
+BmNodeStatsReporter::~BmNodeStatsReporter()
+{
+ if (!_stop) {
+ stop();
+ }
+}
+
+void
+BmNodeStatsReporter::start(std::chrono::milliseconds interval)
+{
+ if (!_started) {
+ _started = true;
+ _executor.execute(makeLambdaTask([this, interval]() { run_report_loop(interval); }));
+ std::unique_lock<std::mutex> guard(_mutex);
+ _cond.wait(guard, [this]() { return _pending_report == 0u; });
+ }
+}
+
+void
+BmNodeStatsReporter::stop()
+{
+ {
+ std::lock_guard guard(_mutex);
+ _stop = true;
+ _cond.notify_all();
+ }
+ _executor.sync();
+ _executor.shutdown();
+}
+
+void
+BmNodeStatsReporter::report_now()
+{
+ std::unique_lock<std::mutex> guard(_mutex);
+ ++_pending_report;
+ _cond.notify_all();
+ _cond.wait(guard, [this]() { return _pending_report == 0u || _stop; });
+}
+
+void
+BmNodeStatsReporter::report()
+{
+ using Width = vespalib::asciistream::Width;
+ auto node_stats = _cluster.get_node_stats();
+ vespalib::asciistream s;
+ s << "nodes stats ";
+ BmNodeStats totals;
+ for (auto &node : node_stats) {
+ auto& document_db = node.get_document_db_stats();
+ if (document_db.has_value()) {
+ s << Width(8) << document_db.value().get_total_docs();
+ } else {
+ s << Width(8) << "-";
+ }
+ totals += node;
+ }
+ if (totals.get_document_db_stats().has_value()) {
+ s << Width(8) << totals.get_document_db_stats().value().get_total_docs();
+ } else {
+ s << Width(8) << "-";
+ }
+ auto& total_buckets = totals.get_buckets_stats();
+ if (total_buckets.has_value()) {
+ s << Width(8) << total_buckets.value().get_buckets_pending();
+ if (!total_buckets.value().get_valid()) {
+ s << "?";
+ }
+ } else {
+ s << Width(8) << "-";
+ }
+ vespalib::string ss(s.str());
+ LOG(info, "%s", ss.c_str());
+}
+
+void
+BmNodeStatsReporter::run_report_loop(std::chrono::milliseconds interval)
+{
+ std::unique_lock<std::mutex> guard(_mutex);
+ while (!_stop) {
+ uint32_t pending_handled = _pending_report;
+ guard.unlock();
+ report();
+ guard.lock();
+ if (pending_handled != 0u) {
+ _pending_report -= pending_handled;
+ _cond.notify_all();
+ }
+ if (!_stop && _pending_report == 0u) {
+ _cond.wait_for(guard, interval);
+ }
+ }
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
new file mode 100644
index 00000000000..be0e10e37a4
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
@@ -0,0 +1,33 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <chrono>
+#include <mutex>
+#include <condition_variable>
+
+namespace search::bmcluster {
+
+class BmCluster;
+
+class BmNodeStatsReporter {
+ BmCluster& _cluster;
+ vespalib::ThreadStackExecutor _executor;
+ std::mutex _mutex;
+ std::condition_variable _cond;
+ uint32_t _pending_report;
+ bool _started;
+ bool _stop;
+
+ void report();
+ void run_report_loop(std::chrono::milliseconds interval);
+public:
+ BmNodeStatsReporter(BmCluster& cluster);
+ ~BmNodeStatsReporter();
+ void start(std::chrono::milliseconds interval);
+ void stop();
+ void report_now();
+};
+
+}