summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-01 11:25:17 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-01 11:25:17 +0200
commitbe7637d3caf41ea4e03386d76e22227a876480e6 (patch)
treec472333135a0f255be4a63d5cfa909627b25f262 /searchcore
parent4610d5c0e0b55c1fbf77b82f34f1d0b433c18ae6 (diff)
Report merge stats.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp2
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.cpp36
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.h27
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp45
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h3
12 files changed, 189 insertions, 21 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 0fe673cebe0..1f488b10609 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -167,7 +167,7 @@ Benchmark::run()
auto update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update");
auto get_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_get_feed(range, bucket_selector); }, _feed.num_buckets(), "get");
auto remove_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_remove_feed(range, bucket_selector); }, _feed.num_buckets(), "remove");
- BmNodeStatsReporter reporter(*_cluster);
+ BmNodeStatsReporter reporter(*_cluster, false);
reporter.start(500ms);
int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
index 1b8f93b381f..3e7b2439a0c 100644
--- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -279,7 +279,7 @@ Benchmark::feed()
vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
- BmNodeStatsReporter reporter(*_cluster);
+ BmNodeStatsReporter reporter(*_cluster, false);
reporter.start(500ms);
int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
@@ -297,7 +297,7 @@ Benchmark::feed()
std::chrono::duration<double>
Benchmark::redistribute()
{
- BmNodeStatsReporter reporter(*_cluster);
+ BmNodeStatsReporter reporter(*_cluster, true);
auto before = std::chrono::steady_clock::now();
reporter.start(500ms);
_cluster->propagate_cluster_state();
@@ -367,12 +367,16 @@ App::usage()
"vespa-redistribute-bm\n"
"[--bucket-db-stripe-bits bits]\n"
"[--client-threads threads]\n"
+ "[--distributor-merge-busy-wait distributor-merge-busy-wait]\n"
"[--distributor-stripes stripes]\n"
"[--documents documents]\n"
"[--flip-nodes flip-nodes]\n"
"[--groups groups]\n"
"[--indexing-sequencer [latency,throughput,adaptive]]\n"
+ "[--max-merges-per-node max-merges-per-node]\n"
+ "[--max-merge-queue-size max-merge-queue-size]\n"
"[--max-pending max-pending]\n"
+ "[--max-pending-idealstate-operations max-pending-idealstate-operations]\n"
"[--mode [grow, shrink, perm-crash, temp-crash, replace]\n"
"[--nodes-per-group nodes-per-group]\n"
"[--redundancy redundancy]\n"
@@ -394,12 +398,16 @@ App::get_options()
static struct option long_opts[] = {
{ "bucket-db-stripe-bits", 1, nullptr, 0 },
{ "client-threads", 1, nullptr, 0 },
+ { "distributor-merge-busy-wait", 1, nullptr, 0 },
{ "distributor-stripes", 1, nullptr, 0 },
{ "documents", 1, nullptr, 0 },
{ "flip-nodes", 1, nullptr, 0 },
{ "groups", 1, nullptr, 0 },
{ "indexing-sequencer", 1, nullptr, 0 },
+ { "max-merges-per-node", 1, nullptr, 0 },
+ { "max-merge-queue-size", 1, nullptr, 0 },
{ "max-pending", 1, nullptr, 0 },
+ { "max-pending-idealstate-operations", 1, nullptr, 0 },
{ "mode", 1, nullptr, 0 },
{ "nodes-per-group", 1, nullptr, 0 },
{ "redundancy", 1, nullptr, 0 },
@@ -414,12 +422,16 @@ App::get_options()
enum longopts_enum {
LONGOPT_BUCKET_DB_STRIPE_BITS,
LONGOPT_CLIENT_THREADS,
+ LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT,
LONGOPT_DISTRIBUTOR_STRIPES,
LONGOPT_DOCUMENTS,
LONGOPT_FLIP_NODES,
LONGOPT_GROUPS,
LONGOPT_INDEXING_SEQUENCER,
+ LONGOPT_MAX_MERGES_PER_NODE,
+ LONGOPT_MAX_MERGE_QUEUE_SIZE,
LONGOPT_MAX_PENDING,
+ LONGOPT_MAX_PENDING_IDEALSTATE_OPERATIONS,
LONGOPT_MODE,
LONGOPT_NODES_PER_GROUP,
LONGOPT_REDUNDANCY,
@@ -443,6 +455,9 @@ App::get_options()
case LONGOPT_CLIENT_THREADS:
_bm_params.set_client_threads(atoi(opt_argument));
break;
+ case LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT:
+ _bm_params.set_distributor_merge_busy_wait(atoi(opt_argument));
+ break;
case LONGOPT_DISTRIBUTOR_STRIPES:
_bm_params.set_distributor_stripes(atoi(opt_argument));
break;
@@ -458,9 +473,18 @@ App::get_options()
case LONGOPT_INDEXING_SEQUENCER:
_bm_params.set_indexing_sequencer(opt_argument);
break;
+ case LONGOPT_MAX_MERGES_PER_NODE:
+ _bm_params.set_max_merges_per_node(atoi(opt_argument));
+ break;
+ case LONGOPT_MAX_MERGE_QUEUE_SIZE:
+ _bm_params.set_max_merge_queue_size(atoi(opt_argument));
+ break;
case LONGOPT_MAX_PENDING:
_bm_params.set_max_pending(atoi(opt_argument));
break;
+ case LONGOPT_MAX_PENDING_IDEALSTATE_OPERATIONS:
+ _bm_params.set_max_pending_idealstate_operations(atoi(opt_argument));
+ break;
case LONGOPT_MODE:
_bm_params.set_mode(get_mode(opt_argument));
if (_bm_params.get_mode() == Mode::BAD) {
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
index 0f2299958c3..ac9ae5519f7 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -10,6 +10,7 @@ vespa_add_library(searchcore_bmcluster STATIC
bm_feed.cpp
bm_feeder.cpp
bm_feed_params.cpp
+ bm_merge_stats.cpp
bm_message_bus.cpp
bm_message_bus_routes.cpp
bm_node.cpp
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
index b8319529a3b..157c74d49a4 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
@@ -7,11 +7,15 @@ namespace search::bmcluster {
BmClusterParams::BmClusterParams()
: _bucket_db_stripe_bits(4),
+ _distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def
_distributor_stripes(0),
_enable_distributor(false),
_enable_service_layer(false),
_groups(0),
_indexing_sequencer(),
+ _max_merges_per_node(16), // Same default as in stor-server.def
+ _max_merge_queue_size(1024), // Same default as in stor-server.def
+ _max_pending_idealstate_operations(100), // Same default as in stor-distributormanager.def
_num_nodes(1),
_nodes_per_group(1),
_redundancy(1),
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
index fbb16ba740f..d4b277609ca 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
@@ -13,11 +13,15 @@ namespace search::bmcluster {
class BmClusterParams
{
uint32_t _bucket_db_stripe_bits;
+ uint32_t _distributor_merge_busy_wait;
uint32_t _distributor_stripes;
bool _enable_distributor;
bool _enable_service_layer;
uint32_t _groups;
vespalib::string _indexing_sequencer;
+ uint32_t _max_merges_per_node;
+ uint32_t _max_merge_queue_size;
+ uint32_t _max_pending_idealstate_operations;
uint32_t _num_nodes;
uint32_t _nodes_per_group;
uint32_t _redundancy;
@@ -36,10 +40,14 @@ public:
BmClusterParams();
~BmClusterParams();
uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
+ uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; }
uint32_t get_distributor_stripes() const { return _distributor_stripes; }
bool get_enable_distributor() const { return _enable_distributor; }
uint32_t get_groups() const noexcept { return _groups; }
const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
+ uint32_t get_max_merges_per_node() const noexcept { return _max_merges_per_node; }
+ uint32_t get_max_merge_queue_size() const noexcept { return _max_merge_queue_size; }
+ uint32_t get_max_pending_idealstate_operations() const noexcept { return _max_pending_idealstate_operations; }
uint32_t get_nodes_per_group() const noexcept { return _nodes_per_group; }
uint32_t get_num_nodes() const { return _num_nodes; }
uint32_t get_redundancy() const { return _redundancy; }
@@ -57,11 +65,15 @@ public:
bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
+ void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; }
void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; }
void set_enable_distributor(bool value) { _enable_distributor = value; }
void set_enable_service_layer(bool value) { _enable_service_layer = value; }
void set_groups(uint32_t value);
void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
+ void set_max_merges_per_node(uint32_t value) { _max_merges_per_node = value; }
+ void set_max_merge_queue_size(uint32_t value) { _max_merge_queue_size = value; }
+ void set_max_pending_idealstate_operations(uint32_t value) { _max_pending_idealstate_operations = value; }
void set_nodes_per_group(uint32_t value);
void set_redundancy(uint32_t value) { _redundancy = value; }
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.cpp
new file mode 100644
index 00000000000..fc8431f31a1
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.cpp
@@ -0,0 +1,36 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_merge_stats.h"
+
+namespace search::bmcluster {
+
+BmMergeStats::BmMergeStats()
+ : BmMergeStats(0u, 0u)
+{
+}
+
+BmMergeStats::BmMergeStats(uint32_t active, uint32_t queued)
+ : _active(active),
+ _queued(queued)
+{
+}
+
+
+BmMergeStats::~BmMergeStats() = default;
+
+BmMergeStats&
+BmMergeStats::operator+=(const BmMergeStats& rhs)
+{
+ _active += rhs._active;
+ _queued += rhs._queued;
+ return *this;
+}
+
+bool
+BmMergeStats::operator==(const BmMergeStats &rhs) const
+{
+ return ((_active == rhs._active) &&
+ (_queued == rhs._queued));
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.h
new file mode 100644
index 00000000000..7eb1ba1752c
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_merge_stats.h
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+
+namespace search::bmcluster {
+
+/*
+ * Class containing merge (throttler) stats for a service layer node.
+ */
+class BmMergeStats
+{
+ uint32_t _active;
+ uint32_t _queued;
+
+public:
+ BmMergeStats();
+ BmMergeStats(uint32_t active, uint32_t queued);
+ ~BmMergeStats();
+ BmMergeStats& operator+=(const BmMergeStats& rhs);
+ bool operator==(const BmMergeStats &rhs) const;
+ uint64_t get_active() const noexcept { return _active; }
+ uint64_t get_queued() const noexcept { return _queued; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 7ed83ed669a..15683a6a455 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -66,6 +66,7 @@
#include <vespa/storage/config/config-stor-status.h>
#include <vespa/storage/config/config-stor-visitordispatcher.h>
#include <vespa/storage/distributor/bucket_spaces_stats_provider.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
#include <vespa/storageserver/app/distributorprocess.h>
@@ -98,6 +99,7 @@ using proton::HwInfo;
using search::index::Schema;
using search::index::SchemaBuilder;
using search::transactionlog::TransLogServer;
+using storage::MergeThrottler;
using storage::distributor::BucketSpacesStatsProvider;
using storage::rpc::SharedRpcResources;
using storage::rpc::StorageApiRpcService;
@@ -154,15 +156,16 @@ int port_number(int base_port, PortBias bias)
storage::spi::Context context(storage::spi::Priority(0), 0);
-BucketSpacesStatsProvider* extract_bucket_spaces_stats_provider(storage::DistributorProcess& distributor)
+template <class ChainLink, class Process>
+ChainLink* extract_chain_link(Process &process)
{
- auto& node = distributor.getNode();
+ auto& node = process.getNode();
auto *link = node.getChain();
while (link != nullptr) {
link = link->getNextLink();
- auto provider = dynamic_cast<BucketSpacesStatsProvider*>(link);
- if (provider != nullptr) {
- return provider;
+ auto chain_link = dynamic_cast<ChainLink*>(link);
+ if (chain_link != nullptr) {
+ return chain_link;
}
}
return nullptr;
@@ -325,6 +328,8 @@ struct StorageConfigSet
} else {
stor_server.rootFolder = base_dir + "/storage";
}
+ stor_server.maxMergesPerNode = params.get_max_merges_per_node();
+ stor_server.maxMergeQueueSize = params.get_max_merge_queue_size();
make_slobroks_config(slobroks, slobrok_port);
stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakeup();
@@ -402,6 +407,8 @@ struct DistributorConfigSet : public StorageConfigSet
stor_distributormanager(),
stor_visitordispatcher()
{
+ stor_distributormanager.inhibitMergeSendingOnBusyNodeDurationSec = params.get_distributor_merge_busy_wait();
+ stor_distributormanager.maxpendingidealstateoperations = params.get_max_pending_idealstate_operations();
stor_distributormanager.numDistributorStripes = params.get_distributor_stripes();
}
@@ -459,6 +466,7 @@ class MyBmNode : public BmNode
std::unique_ptr<mbus::Slobrok> _slobrok;
std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context;
std::unique_ptr<MyServiceLayerProcess> _service_layer;
+ MergeThrottler* _merge_throttler;
std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
std::unique_ptr<storage::DistributorProcess> _distributor;
BucketSpacesStatsProvider* _bucket_spaces_stats_provider;
@@ -522,6 +530,7 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod
_slobrok(),
_service_layer_chain_context(),
_service_layer(),
+ _merge_throttler(nullptr),
_distributor_chain_context(),
_distributor(),
_bucket_spaces_stats_provider(nullptr),
@@ -610,6 +619,9 @@ MyBmNode::start_service_layer(const BmClusterParams& params)
std::move(chain_builder));
_service_layer->setupConfig(100ms);
_service_layer->createNode();
+ auto merge_throttler = extract_chain_link<MergeThrottler>(*_service_layer);
+ std::lock_guard<std::mutex> guard(_lock);
+ _merge_throttler = merge_throttler;
}
void
@@ -633,7 +645,7 @@ MyBmNode::start_distributor(const BmClusterParams& params)
}
_distributor->setupConfig(100ms);
_distributor->createNode();
- auto bucket_spaces_stats_provider = extract_bucket_spaces_stats_provider(*_distributor);
+ auto bucket_spaces_stats_provider = extract_chain_link<BucketSpacesStatsProvider>(*_distributor);
std::lock_guard<std::mutex> guard(_lock);
_bucket_spaces_stats_provider = bucket_spaces_stats_provider;
}
@@ -657,6 +669,10 @@ MyBmNode::shutdown_service_layer()
{
if (_service_layer) {
LOG(info, "stop service layer");
+ {
+ std::lock_guard guard(_lock);
+ _merge_throttler = nullptr;
+ }
_service_layer->getNode().requestShutdown("controlled shutdown");
_service_layer->shutdown();
}
@@ -711,7 +727,6 @@ MyBmNode::merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::C
{
auto& storage_node_state = baseline_state.getNodeState(storage::lib::Node(storage::lib::NodeType::STORAGE, _node_idx));
if (storage_node_state.getState().oneOf("uir")) {
- // TODO: Check cluster state and ignore nodes that are down.
if (_document_db) {
proton::DocumentMetaStoreReadGuards dmss(_document_db->getDocumentSubDBs());
uint32_t active_docs = dmss.numActiveDocs();
@@ -723,6 +738,22 @@ MyBmNode::merge_node_stats(std::vector<BmNodeStats>& node_stats, storage::lib::C
node_stats[_node_idx].set_document_db_stats(BmDocumentDbStats(active_docs, ready_docs, total_docs, removed_docs));
}
}
+ std::lock_guard<std::mutex> guard(_lock);
+ if (_merge_throttler) {
+ auto& state_lock = _merge_throttler->getStateLock();
+ auto& active_merges = _merge_throttler->getActiveMerges();
+ auto& merge_queue = _merge_throttler->getMergeQueue();
+ uint32_t active_merges_size = 0;
+ uint32_t merge_queue_size = 0;
+ {
+ std::lock_guard mt_guard(state_lock);
+ active_merges_size = active_merges.size();
+ merge_queue_size = merge_queue.size();
+ }
+ if (_node_idx < node_stats.size()) {
+ node_stats[_node_idx].set_merge_stats(BmMergeStats(active_merges_size, merge_queue_size));
+ }
+ }
}
auto& distributor_node_state = baseline_state.getNodeState(storage::lib::Node(storage::lib::NodeType::DISTRIBUTOR, _node_idx));
if (distributor_node_state.getState().oneOf("u")) {
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp
index c9330dec59e..484c4722e2e 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.cpp
@@ -30,8 +30,9 @@ void merge(std::optional<Stats> &lhs, const std::optional<Stats> &rhs)
}
BmNodeStats::BmNodeStats()
- : _document_db(),
- _buckets()
+ : _buckets(),
+ _document_db(),
+ _merges()
{
}
@@ -41,16 +42,18 @@ BmNodeStats::~BmNodeStats() = default;
BmNodeStats&
BmNodeStats::operator+=(const BmNodeStats& rhs)
{
- merge(_document_db, rhs._document_db);
merge(_buckets, rhs._buckets);
+ merge(_document_db, rhs._document_db);
+ merge(_merges, rhs._merges);
return *this;
}
bool
BmNodeStats::operator==(const BmNodeStats &rhs) const
{
- return ((_document_db == rhs._document_db) &&
- (_buckets == rhs._buckets));
+ return ((_buckets == rhs._buckets) &&
+ (_document_db == rhs._document_db) &&
+ (_merges == rhs._merges));
}
void
@@ -66,4 +69,11 @@ BmNodeStats::merge_bucket_stats(const BmBucketsStats &buckets)
merge(_buckets, buckets);
}
+void
+BmNodeStats::set_merge_stats(const BmMergeStats &merges)
+{
+ assert(!_merges);
+ _merges = merges;
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h
index 06baae3a9ef..03dcd359c21 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats.h
@@ -2,8 +2,9 @@
#pragma once
-#include "bm_document_db_stats.h"
#include "bm_buckets_stats.h"
+#include "bm_document_db_stats.h"
+#include "bm_merge_stats.h"
#include <optional>
namespace search::bmcluster {
@@ -13,17 +14,20 @@ namespace search::bmcluster {
*/
class BmNodeStats
{
- std::optional<BmDocumentDbStats> _document_db;
std::optional<BmBucketsStats> _buckets;
+ std::optional<BmDocumentDbStats> _document_db;
+ std::optional<BmMergeStats> _merges;
public:
BmNodeStats();
~BmNodeStats();
BmNodeStats& operator+=(const BmNodeStats& rhs);
bool operator==(const BmNodeStats &rhs) const;
- void set_document_db_stats(const BmDocumentDbStats &document_db);
void merge_bucket_stats(const BmBucketsStats &buckets);
- const std::optional<BmDocumentDbStats>& get_document_db_stats() const noexcept { return _document_db; }
+ void set_document_db_stats(const BmDocumentDbStats &document_db);
+ void set_merge_stats(const BmMergeStats &merges);
const std::optional<BmBucketsStats>& get_buckets_stats() const noexcept { return _buckets; }
+ const std::optional<BmDocumentDbStats>& get_document_db_stats() const noexcept { return _document_db; }
+ const std::optional<BmMergeStats>& get_merge_stats() const noexcept { return _merges; }
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
index ecd0593031e..da3964176e6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
@@ -32,7 +32,7 @@ bool steady_buckets_stats(const std::optional<BmBucketsStats> buckets)
}
-BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster)
+BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster, bool report_merge_stats)
: _cluster(cluster),
_executor(1, 128_Ki),
_mutex(),
@@ -40,6 +40,7 @@ BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster)
_change_time(),
_prev_node_stats(),
_pending_report(1u),
+ _report_merge_stats(report_merge_stats),
_started(false),
_stop(false)
{
@@ -117,6 +118,23 @@ BmNodeStatsReporter::report()
}
vespalib::string ss(s.str());
LOG(info, "%s", ss.c_str());
+ if (_report_merge_stats) {
+ s.clear();
+ vespalib::asciistream ns;
+ s << "merge stats ";
+ for (auto& node : node_stats) {
+ auto &merges = node.get_merge_stats();
+ if (merges.has_value()) {
+ ns.clear();
+ ns << merges.value().get_active() << "/" << merges.value().get_queued();
+ s << Width(10) << ns.str();
+ } else {
+ s << Width(10) << "-";
+ }
+ }
+ ss = s.str();
+ LOG(info, "%s", ss.c_str());
+ }
if (!(node_stats == _prev_node_stats) || !steady_buckets_stats(total_buckets)) {
_change_time = std::chrono::steady_clock::now();
_prev_node_stats = node_stats;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
index 1c8f013f5e6..f4f32cf79c8 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
@@ -24,13 +24,14 @@ class BmNodeStatsReporter {
std::chrono::time_point<std::chrono::steady_clock> _change_time;
std::vector<BmNodeStats> _prev_node_stats;
uint32_t _pending_report;
+ bool _report_merge_stats;
bool _started;
bool _stop;
void report();
void run_report_loop(std::chrono::milliseconds interval);
public:
- BmNodeStatsReporter(BmCluster& cluster);
+ BmNodeStatsReporter(BmCluster& cluster, bool report_merge_stats);
~BmNodeStatsReporter();
void start(std::chrono::milliseconds interval);
void stop();