summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-20 10:57:46 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-20 10:57:46 +0200
commit60bce5566ae5c4fc912afbfd75080baf57fcedb2 (patch)
tree1b03a335b29dc21b45f2f372ba096ff2c0a83958 /searchcore
parent304fc2ea70fd82957565416554bfed190353d643 (diff)
Add vespa-redistribute-bm, a single-process benchmark of document redistribution.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp68
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/.gitignore1
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt8
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp627
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp55
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h8
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h4
13 files changed, 769 insertions, 46 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index c76f35bd9ff..c0353747e80 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -50,6 +50,7 @@ vespa_define_module(
src/apps/vespa-feed-bm
src/apps/vespa-gen-testdocs
src/apps/vespa-proton-cmd
+ src/apps/vespa-redistribute-bm
src/apps/vespa-transactionlog-inspect
TESTS
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 7f9ae442dd3..3dac66685f9 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -51,8 +51,6 @@ using search::bmcluster::BmNodeStatsReporter;
using search::bmcluster::BmRange;
using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
-using storage::spi::PersistenceProvider;
-using vespalib::makeLambdaTask;
namespace {
@@ -114,71 +112,73 @@ BMParams::check() const
}
-struct PersistenceProviderFixture {
+class Benchmark {
+ BMParams _params;
std::shared_ptr<const DocumenttypesConfig> _document_types;
std::shared_ptr<const DocumentTypeRepo> _repo;
- std::unique_ptr<BmCluster> _bm_cluster;
+ std::unique_ptr<BmCluster> _cluster;
BmFeed _feed;
- explicit PersistenceProviderFixture(const BMParams& params);
- ~PersistenceProviderFixture();
+ void benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, uint32_t passes, const vespalib::string &op_name);
+public:
+ explicit Benchmark(const BMParams& params);
+ ~Benchmark();
+ void run();
};
-PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
- : _document_types(make_document_types()),
+Benchmark::Benchmark(const BMParams& params)
+ : _params(params),
+ _document_types(make_document_types()),
_repo(document::DocumentTypeRepoFactory::make(*_document_types)),
- _bm_cluster(std::make_unique<BmCluster>(base_dir, base_port, params, _document_types, _repo)),
+ _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)),
_feed(_repo)
{
- _bm_cluster->make_nodes();
+ _cluster->make_nodes();
}
-PersistenceProviderFixture::~PersistenceProviderFixture() = default;
+Benchmark::~Benchmark() = default;
void
-benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params, uint32_t passes, const vespalib::string &op_name)
+Benchmark::benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, uint32_t passes, const vespalib::string &op_name)
{
if (passes == 0) {
return;
}
AvgSampler sampler;
LOG(info, "--------------------------------");
- LOG(info, "%sAsync: %u small documents, passes=%u", op_name.c_str(), params.get_documents(), passes);
+ LOG(info, "%sAsync: %u small documents, passes=%u", op_name.c_str(), _params.get_documents(), passes);
for (uint32_t pass = 0; pass < passes; ++pass) {
- feeder.run_feed_tasks(pass, time_bias, serialized_feed, params, sampler, op_name);
+ feeder.run_feed_tasks(pass, time_bias, serialized_feed, _params, sampler, op_name);
}
LOG(info, "%sAsync: AVG %s/s: %8.2f", op_name.c_str(), op_name.c_str(), sampler.avg());
}
-void benchmark(const BMParams &bm_params)
+void
+Benchmark::run()
{
- vespalib::rmdir(base_dir, true);
- PersistenceProviderFixture f(bm_params);
- auto& cluster = *f._bm_cluster;
- cluster.start(f._feed);
- vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki);
- BmFeeder feeder(f._repo, *cluster.get_feed_handler(), executor);
- auto& feed = f._feed;
- auto put_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put");
- auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
- auto get_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_get_feed(range, bucket_selector); }, f._feed.num_buckets(), "get");
- auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
- BmNodeStatsReporter reporter(cluster);
+ _cluster->start(_feed);
+ vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
+ BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
+ auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
+ auto update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update");
+ auto get_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_get_feed(range, bucket_selector); }, _feed.num_buckets(), "get");
+ auto remove_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_remove_feed(range, bucket_selector); }, _feed.num_buckets(), "remove");
+ BmNodeStatsReporter reporter(*_cluster);
reporter.start(500ms);
int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
- benchmark_feed(feeder, time_bias, put_feed, bm_params, bm_params.get_put_passes(), "put");
+ benchmark_feed(feeder, time_bias, put_feed, _params.get_put_passes(), "put");
reporter.report_now();
- benchmark_feed(feeder, time_bias, update_feed, bm_params, bm_params.get_update_passes(), "update");
+ benchmark_feed(feeder, time_bias, update_feed, _params.get_update_passes(), "update");
reporter.report_now();
- benchmark_feed(feeder, time_bias, get_feed, bm_params, bm_params.get_get_passes(), "get");
+ benchmark_feed(feeder, time_bias, get_feed, _params.get_get_passes(), "get");
reporter.report_now();
- benchmark_feed(feeder, time_bias, remove_feed, bm_params, bm_params.get_remove_passes(), "remove");
+ benchmark_feed(feeder, time_bias, remove_feed, _params.get_remove_passes(), "remove");
reporter.report_now();
reporter.stop();
LOG(info, "--------------------------------");
- cluster.stop();
+ _cluster->stop();
}
class App : public FastOS_Application
@@ -382,7 +382,9 @@ App::Main()
usage();
return 1;
}
- benchmark(_bm_params);
+ vespalib::rmdir(base_dir, true);
+ Benchmark bm(_bm_params);
+ bm.run();
return 0;
}
diff --git a/searchcore/src/apps/vespa-redistribute-bm/.gitignore b/searchcore/src/apps/vespa-redistribute-bm/.gitignore
new file mode 100644
index 00000000000..4a7424f7ef4
--- /dev/null
+++ b/searchcore/src/apps/vespa-redistribute-bm/.gitignore
@@ -0,0 +1 @@
+vespa-redistribute-bm
diff --git a/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt b/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt
new file mode 100644
index 00000000000..5b34c1aefb8
--- /dev/null
+++ b/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_vespa_redistribute_bm_app
+ SOURCES
+ vespa_redistribute_bm.cpp
+ OUTPUT_NAME vespa-redistribute-bm
+ DEPENDS
+ searchcore_bmcluster
+)
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
new file mode 100644
index 00000000000..34cd091b615
--- /dev/null
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -0,0 +1,627 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/repo/configbuilder.h>
+#include <vespa/document/repo/document_type_repo_factory.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/fastos/app.h>
+#include <vespa/searchcore/bmcluster/avg_sampler.h>
+#include <vespa/searchcore/bmcluster/bm_cluster.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_controller.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_params.h>
+#include <vespa/searchcore/bmcluster/bm_distribution.h>
+#include <vespa/searchcore/bmcluster/bm_feed.h>
+#include <vespa/searchcore/bmcluster/bm_feeder.h>
+#include <vespa/searchcore/bmcluster/bm_feed_params.h>
+#include <vespa/searchcore/bmcluster/bm_node.h>
+#include <vespa/searchcore/bmcluster/bm_node_stats.h>
+#include <vespa/searchcore/bmcluster/bm_node_stats_reporter.h>
+#include <vespa/searchcore/bmcluster/bm_range.h>
+#include <vespa/searchcore/bmcluster/bucket_selector.h>
+#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h>
+#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <getopt.h>
+#include <iostream>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP("vespa-redistribute-bm");
+
+using namespace proton;
+using namespace std::chrono_literals;
+
+using document::DocumentTypeRepo;
+using document::DocumentTypeRepoFactory;
+using document::DocumenttypesConfig;
+using document::DocumenttypesConfigBuilder;
+using search::bmcluster::AvgSampler;
+using search::bmcluster::BmClusterController;
+using search::bmcluster::IBmFeedHandler;
+using search::bmcluster::BmClusterParams;
+using search::bmcluster::BmCluster;
+using search::bmcluster::BmFeed;
+using search::bmcluster::BmFeedParams;
+using search::bmcluster::BmFeeder;
+using search::bmcluster::BmNode;
+using search::bmcluster::BmNodeStatsReporter;
+using search::bmcluster::BmRange;
+using search::bmcluster::BucketSelector;
+using search::index::DummyFileHeaderContext;
+using storage::lib::State;
+
+namespace {
+
+vespalib::string base_dir = "testdb";
+constexpr int base_port = 9017;
+
+std::shared_ptr<DocumenttypesConfig> make_document_types() {
+ using Struct = document::config_builder::Struct;
+ using DataType = document::DataType;
+ document::config_builder::DocumenttypesConfigBuilderHelper builder;
+ builder.document(42, "test", Struct("test.header").addField("int", DataType::T_INT), Struct("test.body"));
+ return std::make_shared<DocumenttypesConfig>(builder.config());
+}
+
+enum class Mode {
+ GROW,
+ SHRINK,
+ PERM_CRASH,
+ TEMP_CRASH,
+ REPLACE,
+ BAD,
+};
+
+std::vector<vespalib::string> mode_names = {
+ "grow",
+ "shrink",
+ "perm-crash",
+ "temp-crash",
+ "replace"
+};
+
+vespalib::string bad_mode_name("bad");
+
+Mode get_mode(const vespalib::string& mode_name) {
+ for (uint32_t i = 0; i < mode_names.size(); ++i) {
+ if (mode_name == mode_names[i]) {
+ return static_cast<Mode>(i);
+ }
+ }
+ return Mode::BAD;
+}
+
+vespalib::string& get_mode_name(Mode mode) {
+ uint32_t i = static_cast<uint32_t>(mode);
+ return (i < mode_names.size()) ? mode_names[i] : bad_mode_name;
+}
+
+double
+estimate_lost_docs_base_ratio(uint32_t redundancy, uint32_t lost_nodes, uint32_t num_nodes)
+{
+ if (redundancy > lost_nodes) {
+ return 0.0;
+ }
+ double loss_ratio = 1.0;
+ for (uint32_t i = 0; i < redundancy; ++i) {
+ loss_ratio *= ((double) (lost_nodes - i)) / (num_nodes - i);
+ }
+ LOG(info, "estimated lost docs base ratio: %4.2f", loss_ratio);
+ return loss_ratio;
+}
+
+double
+estimate_moved_docs_ratio_grow(uint32_t redundancy, uint32_t added_nodes, uint32_t num_nodes)
+{
+ double new_redundancy = redundancy;
+ double new_per_node_doc_ratio = new_redundancy / num_nodes;
+ double moved_ratio = new_per_node_doc_ratio * added_nodes;
+ LOG(info, "estimated_moved_docs_ratio_grow(%u,%u,%u)=%4.2f", redundancy, added_nodes, num_nodes, moved_ratio);
+ return moved_ratio;
+}
+
+double
+estimate_moved_docs_ratio_shrink(uint32_t redundancy, uint32_t retired_nodes, uint32_t num_nodes)
+{
+ double old_redundancy = redundancy;
+ double old_per_node_doc_ratio = old_redundancy / num_nodes;
+ uint32_t new_nodes = num_nodes - retired_nodes;
+ double new_redundancy = std::min(redundancy, new_nodes);
+ double new_per_node_doc_ratio = new_redundancy / new_nodes;
+ double moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * new_nodes;
+ LOG(info, "estimated_moved_docs_ratio_shrink(%u,%u,%u)=%4.2f", redundancy, retired_nodes, num_nodes, moved_ratio);
+ return moved_ratio;
+}
+
+double
+estimate_moved_nodes_ratio_crash(uint32_t redundancy, uint32_t crashed_nodes, uint32_t num_nodes)
+{
+ double old_redundancy = redundancy;
+ double old_per_node_doc_ratio = old_redundancy / num_nodes;
+ uint32_t new_nodes = num_nodes - crashed_nodes;
+ double new_redundancy = std::min(redundancy, new_nodes);
+ double new_per_node_doc_ratio = new_redundancy / new_nodes;
+ double lost_docs_ratio = estimate_lost_docs_base_ratio(redundancy, crashed_nodes, num_nodes) * new_redundancy;
+ double moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * new_nodes - lost_docs_ratio;
+ LOG(info, "estimated_moved_docs_ratio_crash(%u,%u,%u)=%4.2f", redundancy, crashed_nodes, num_nodes, moved_ratio);
+ return moved_ratio;
+}
+
+double
+estimate_moved_nodes_ratio_replace(uint32_t redundancy, uint32_t added_nodes, uint32_t retired_nodes, uint32_t num_nodes)
+{
+ uint32_t old_nodes = num_nodes - added_nodes;
+ double old_redundancy = std::min(redundancy, old_nodes);
+ double old_per_node_doc_ratio = old_redundancy / old_nodes;
+ uint32_t new_nodes = num_nodes - retired_nodes;
+ double new_redundancy = std::min(redundancy, new_nodes);
+ double new_per_node_doc_ratio = new_redundancy / new_nodes;
+ double moved_ratio = new_per_node_doc_ratio * added_nodes;
+ uint32_t stable_nodes = num_nodes - added_nodes - retired_nodes;
+ // Account for extra documents moved from retired nodes to stable nodes
+ // TODO: Fix calculation
+ double extra_moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * stable_nodes;
+ extra_moved_ratio += new_per_node_doc_ratio * stable_nodes * retired_nodes / new_nodes;
+ moved_ratio += extra_moved_ratio;
+ LOG(info, "estimated_moved_docs_ratio_replace(%u,%u,%u,%u)=%4.2f, (of which %4.2f extra)", redundancy, added_nodes, retired_nodes, num_nodes, moved_ratio, extra_moved_ratio);
+ return moved_ratio;
+}
+
+class BMParams : public BmClusterParams,
+ public BmFeedParams
+{
+ uint32_t _flip_nodes;
+ Mode _mode;
+ bool _use_feed_settle;
+public:
+ BMParams();
+ uint32_t get_flip_nodes() const noexcept { return _flip_nodes; }
+ Mode get_mode() const noexcept { return _mode; }
+ bool get_use_feed_settle() const noexcept { return _use_feed_settle; }
+ void set_flip_nodes(uint32_t value) { _flip_nodes = value; }
+ void set_mode(Mode value) { _mode = value; }
+ void set_use_feed_settle(bool value) { _use_feed_settle = value; }
+ bool check() const;
+};
+
+BMParams::BMParams()
+ : BmClusterParams(),
+ BmFeedParams(),
+ _flip_nodes(1u),
+ _mode(Mode::GROW),
+ _use_feed_settle(false)
+{
+ set_enable_service_layer(true);
+ set_enable_distributor(true);
+ set_use_document_api(true);
+ set_num_nodes(4);
+}
+
+
+bool
+BMParams::check() const
+{
+ if (!BmClusterParams::check()) {
+ return false;
+ }
+ if (!BmFeedParams::check()) {
+ return false;
+ }
+ if (get_num_nodes() < 2u) {
+ std::cerr << "Too few nodes: " << get_num_nodes() << std::endl;
+ return false;
+ }
+ if (_mode == Mode::REPLACE) {
+ if (_flip_nodes * 2 > get_num_nodes()) {
+ std::cerr << "Too many flip nodes (" << _flip_nodes << ") with " << get_num_nodes() << " nodes (replace mode)" << std::endl;
+ return false;
+ }
+ } else {
+ if (_flip_nodes >= get_num_nodes()) {
+ std::cerr << "Too many flip nodes (" << _flip_nodes << ") with " << get_num_nodes() << " nodes (" << get_mode_name(_mode) << " mode)" << std::endl;
+ return false;
+ }
+ }
+ if (_mode == Mode::BAD) {
+ std::cerr << "Bad mode" << std::endl;
+ return false;
+ }
+ return true;
+}
+
+}
+
+class Benchmark {
+ BMParams _params;
+ std::shared_ptr<const DocumenttypesConfig> _document_types;
+ std::shared_ptr<const DocumentTypeRepo> _repo;
+ std::unique_ptr<BmCluster> _cluster;
+ BmFeed _feed;
+
+ void adjust_cluster_state_before_feed();
+ void adjust_cluster_state_after_feed();
+ void adjust_cluster_state_after_first_redistribution();
+ double estimate_lost_docs();
+ double estimate_moved_docs();
+ void feed();
+ std::chrono::duration<double> redistribute();
+
+public:
+ explicit Benchmark(const BMParams& params);
+ ~Benchmark();
+ void run();
+};
+
+Benchmark::Benchmark(const BMParams& params)
+ : _params(params),
+ _document_types(make_document_types()),
+ _repo(document::DocumentTypeRepoFactory::make(*_document_types)),
+ _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)),
+ _feed(_repo)
+{
+ _cluster->make_nodes();
+}
+
+Benchmark::~Benchmark() = default;
+
+void
+Benchmark::adjust_cluster_state_before_feed()
+{
+ auto& dist = _cluster->get_real_distribution();
+ auto& mode_name = get_mode_name(_params.get_mode());
+ switch (_params.get_mode()) {
+ case Mode::GROW:
+ case Mode::REPLACE:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::DOWN);
+ }
+ LOG(info, "Mode %s: Taking down %u node(s) initially", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ default:
+ LOG(info, "Mode %s: No cluster state adjust before feed", mode_name.c_str());
+ }
+ dist.commit_cluster_state_change();
+}
+
+void
+Benchmark::adjust_cluster_state_after_feed()
+{
+ auto& dist = _cluster->get_real_distribution();
+ auto& mode_name = get_mode_name(_params.get_mode());
+ switch (_params.get_mode()) {
+ case Mode::GROW:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::UP);
+ }
+ LOG(info, "Mode %s: taking up %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ case Mode::SHRINK:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::RETIRED);
+ }
+ LOG(info, "Mode %s: Retiring %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ case Mode::PERM_CRASH:
+ case Mode::TEMP_CRASH:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::DOWN);
+ }
+ LOG(info, "Mode %s: taking down %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ case Mode::REPLACE:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::UP);
+ }
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i + _params.get_flip_nodes(), State::RETIRED);
+ }
+ LOG(info, "Mode %s: Taking up %u node(s) and retiring %u node(s)", mode_name.c_str(), _params.get_flip_nodes(), _params.get_flip_nodes());
+ break;
+ default:
+ LOG(info, "Mode %s: No cluster state adjust after feed", mode_name.c_str());
+ }
+ dist.commit_cluster_state_change();
+}
+
+void
+Benchmark::adjust_cluster_state_after_first_redistribution()
+{
+ auto& dist = _cluster->get_real_distribution();
+ auto& mode_name = get_mode_name(_params.get_mode());
+ switch (_params.get_mode()) {
+ case Mode::TEMP_CRASH:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::UP);
+ }
+ LOG(info, "Mode %s: taking up %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ default:
+ LOG(info, "Mode %s: No cluster state adjust after first redistribution", mode_name.c_str());
+ }
+ dist.commit_cluster_state_change();
+}
+
+void
+Benchmark::feed()
+{
+ vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
+ BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
+ auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
+ BmNodeStatsReporter reporter(*_cluster);
+ reporter.start(500ms);
+ int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
+ LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
+ AvgSampler sampler;
+ feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put");
+ reporter.report_now();
+ if (_params.get_use_feed_settle()) {
+ LOG(info, "Settling feed");
+ std::this_thread::sleep_for(2s);
+ reporter.report_now();
+ }
+}
+
+
+std::chrono::duration<double>
+Benchmark::redistribute()
+{
+ BmNodeStatsReporter reporter(*_cluster);
+ auto before = std::chrono::steady_clock::now();
+ reporter.start(500ms);
+ _cluster->propagate_cluster_state();
+ reporter.report_now();
+ for (;;) {
+ auto duration = std::chrono::steady_clock::now() - reporter.get_change_time();
+ if (duration >= 6s) {
+ break;
+ }
+ std::this_thread::sleep_for(100ms);
+ }
+ return reporter.get_change_time() - before;
+}
+
+double
+Benchmark::estimate_lost_docs()
+{
+ switch (_params.get_mode()) {
+ case Mode::PERM_CRASH:
+ case Mode::TEMP_CRASH:
+ {
+ double new_redundancy = std::min(_params.get_redundancy(), _params.get_num_nodes() - _params.get_flip_nodes());
+ auto lost_docs_ratio = estimate_lost_docs_base_ratio(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes()) * new_redundancy;
+ return _params.get_documents() * lost_docs_ratio;
+ }
+ default:
+ return 0.0;
+ }
+}
+
+double
+Benchmark::estimate_moved_docs()
+{
+ switch(_params.get_mode()) {
+ case Mode::GROW:
+ return _params.get_documents() * estimate_moved_docs_ratio_grow(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes());
+ case Mode::SHRINK:
+ return _params.get_documents() * estimate_moved_docs_ratio_shrink(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes());
+ case Mode::PERM_CRASH:
+ case Mode::TEMP_CRASH:
+ return _params.get_documents() * estimate_moved_nodes_ratio_crash(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes());
+ case Mode::REPLACE:
+ return _params.get_documents() * estimate_moved_nodes_ratio_replace(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_flip_nodes(), _params.get_num_nodes());
+ default:
+ return 0.0;
+ }
+}
+
+void
+Benchmark::run()
+{
+ adjust_cluster_state_before_feed();
+ _cluster->start(_feed);
+ feed();
+ LOG(info, "--------------------------------");
+ adjust_cluster_state_after_feed();
+ auto elapsed = redistribute();
+ double moved_docs = estimate_moved_docs();
+ double lost_docs = estimate_lost_docs();
+ LOG(info, "Redistributed estimated %4.2f docs in %5.3f seconds, %4.2f docs/s, estimated %4.2f lost docs", moved_docs, elapsed.count(), moved_docs / elapsed.count(), lost_docs);
+ if (_params.get_mode() == Mode::TEMP_CRASH) {
+ if (_params.get_use_feed_settle()) {
+ LOG(info, "Settling redistribution");
+ std::this_thread::sleep_for(2s);
+ }
+ adjust_cluster_state_after_first_redistribution();
+ elapsed = redistribute();
+ LOG(info, "Cleanup of %4.2f docs in %5.3f seconds, %4.2f docs/s, estimated %4.2f refound docs", moved_docs, elapsed.count(), moved_docs / elapsed.count(), lost_docs);
+ }
+ _cluster->stop();
+}
+
+class App : public FastOS_Application
+{
+ BMParams _bm_params;
+public:
+ App();
+ ~App() override;
+ void usage();
+ bool get_options();
+ int Main() override;
+};
+
+App::App()
+ : _bm_params()
+{
+}
+
+App::~App() = default;
+
+void
+App::usage()
+{
+ std::cerr <<
+ "vespa-redistribute-bm version 0.0\n"
+ "\n"
+ "USAGE:\n";
+ std::cerr <<
+ "vespa-redistribute-bm\n"
+ "[--bucket-db-stripe-bits bits]\n"
+ "[--client-threads threads]\n"
+ "[--distributor-stripes stripes]\n"
+ "[--documents documents]\n"
+ "[--flip-nodes flip-nodes]\n"
+ "[--indexing-sequencer [latency,throughput,adaptive]]\n"
+ "[--max-pending max-pending]\n"
+ "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n"
+ "[--nodes nodes]\n"
+ "[--redundancy redundancy]\n"
+ "[--rpc-events-before-wakeup events]\n"
+ "[--rpc-network-threads threads]\n"
+ "[--rpc-targets-per-node targets]\n"
+ "[--response-threads threads]\n"
+ "[--skip-communicationmanager-thread]\n"
+ "[--use-async-message-handling]\n"
+ "[--use-feed-settle]" << std::endl;
+}
+
+bool
+App::get_options()
+{
+ int c;
+ const char *opt_argument = nullptr;
+ int long_opt_index = 0;
+ static struct option long_opts[] = {
+ { "bucket-db-stripe-bits", 1, nullptr, 0 },
+ { "client-threads", 1, nullptr, 0 },
+ { "distributor-stripes", 1, nullptr, 0 },
+ { "documents", 1, nullptr, 0 },
+ { "flip-nodes", 1, nullptr, 0 },
+ { "indexing-sequencer", 1, nullptr, 0 },
+ { "max-pending", 1, nullptr, 0 },
+ { "mode", 1, nullptr, 0 },
+ { "nodes", 1, nullptr, 0 },
+ { "redundancy", 1, nullptr, 0 },
+ { "response-threads", 1, nullptr, 0 },
+ { "rpc-events-before-wakeup", 1, nullptr, 0 },
+ { "rpc-network-threads", 1, nullptr, 0 },
+ { "rpc-targets-per-node", 1, nullptr, 0 },
+ { "skip-communicationmanager-thread", 0, nullptr, 0 },
+ { "use-async-message-handling", 0, nullptr, 0 },
+ { "use-feed-settle", 0, nullptr, 0 }
+ };
+ enum longopts_enum {
+ LONGOPT_BUCKET_DB_STRIPE_BITS,
+ LONGOPT_CLIENT_THREADS,
+ LONGOPT_DISTRIBUTOR_STRIPES,
+ LONGOPT_DOCUMENTS,
+ LONGOPT_FLIP_NODES,
+ LONGOPT_INDEXING_SEQUENCER,
+ LONGOPT_MAX_PENDING,
+ LONGOPT_MODE,
+ LONGOPT_NODES,
+ LONGOPT_REDUNDANCY,
+ LONGOPT_RESPONSE_THREADS,
+ LONGOPT_RPC_EVENTS_BEFORE_WAKEUP,
+ LONGOPT_RPC_NETWORK_THREADS,
+ LONGOPT_RPC_TARGETS_PER_NODE,
+ LONGOPT_SKIP_COMMUNICATIONMANAGER_THREAD,
+ LONGOPT_USE_ASYNC_MESSAGE_HANDLING,
+ LONGOPT_USE_FEED_SETTLE
+ };
+ int opt_index = 1;
+ resetOptIndex(opt_index);
+ while ((c = GetOptLong("", opt_argument, opt_index, long_opts, &long_opt_index)) != -1) {
+ switch (c) {
+ case 0:
+ switch(long_opt_index) {
+ case LONGOPT_BUCKET_DB_STRIPE_BITS:
+ _bm_params.set_bucket_db_stripe_bits(atoi(opt_argument));
+ break;
+ case LONGOPT_CLIENT_THREADS:
+ _bm_params.set_client_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_DISTRIBUTOR_STRIPES:
+ _bm_params.set_distributor_stripes(atoi(opt_argument));
+ break;
+ case LONGOPT_DOCUMENTS:
+ _bm_params.set_documents(atoi(opt_argument));
+ break;
+ case LONGOPT_FLIP_NODES:
+ _bm_params.set_flip_nodes(atoi(opt_argument));
+ break;
+ case LONGOPT_INDEXING_SEQUENCER:
+ _bm_params.set_indexing_sequencer(opt_argument);
+ break;
+ case LONGOPT_MAX_PENDING:
+ _bm_params.set_max_pending(atoi(opt_argument));
+ break;
+ case LONGOPT_MODE:
+ _bm_params.set_mode(get_mode(opt_argument));
+ if (_bm_params.get_mode() == Mode::BAD) {
+ std::cerr << "Unknown mode name " << opt_argument << std::endl;
+ }
+ break;
+ case LONGOPT_NODES:
+ _bm_params.set_num_nodes(atoi(opt_argument));
+ break;
+ case LONGOPT_REDUNDANCY:
+ _bm_params.set_redundancy(atoi(opt_argument));
+ break;
+ case LONGOPT_RESPONSE_THREADS:
+ _bm_params.set_response_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_RPC_EVENTS_BEFORE_WAKEUP:
+ _bm_params.set_rpc_events_before_wakeup(atoi(opt_argument));
+ break;
+ case LONGOPT_RPC_NETWORK_THREADS:
+ _bm_params.set_rpc_network_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_RPC_TARGETS_PER_NODE:
+ _bm_params.set_rpc_targets_per_node(atoi(opt_argument));
+ break;
+ case LONGOPT_SKIP_COMMUNICATIONMANAGER_THREAD:
+ _bm_params.set_skip_communicationmanager_thread(true);
+ break;
+ case LONGOPT_USE_ASYNC_MESSAGE_HANDLING:
+ _bm_params.set_use_async_message_handling_on_schedule(true);
+ break;
+ case LONGOPT_USE_FEED_SETTLE:
+ _bm_params.set_use_feed_settle(true);
+ break;
+ default:
+ return false;
+ }
+ break;
+ default:
+ return false;
+ }
+ }
+ return _bm_params.check();
+}
+
+int
+App::Main()
+{
+ if (!get_options()) {
+ usage();
+ return 1;
+ }
+ vespalib::rmdir(base_dir, true);
+ Benchmark bm(_bm_params);
+ bm.run();
+ return 0;
+}
+
+int
+main(int argc, char* argv[])
+{
+ DummyFileHeaderContext::setCreator("vespa-redistribute-bm");
+ App app;
+ auto exit_value = app.Entry(argc, argv);
+ vespalib::rmdir(base_dir, true);
+ return exit_value;
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
index 36e1e394f40..9fc0b26fff5 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
@@ -148,7 +148,8 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl
_document_types(std::move(document_types)),
_repo(std::move(repo)),
_field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)),
- _distribution(std::make_shared<const BmDistribution>(params.get_num_nodes())),
+ _real_distribution(std::make_shared<BmDistribution>(params.get_num_nodes(), params.get_redundancy())),
+ _distribution(_real_distribution),
_nodes(params.get_num_nodes()),
_cluster_controller(std::make_shared<BmClusterController>(*this, *_distribution)),
_feed_handler()
@@ -438,4 +439,10 @@ BmCluster::get_node_stats()
return node_stats;
}
+void
+BmCluster::propagate_cluster_state()
+{
+ _cluster_controller->propagate_cluster_state();
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
index 8615b44bd7e..62ec710c296 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
@@ -26,6 +26,7 @@ namespace storage::rpc { class SharedRpcResources; }
namespace search::bmcluster {
class BmClusterController;
+class BmDistribution;
class BmFeed;
class BmMessageBus;
class BmNode;
@@ -55,6 +56,7 @@ class BmCluster {
std::shared_ptr<DocumenttypesConfig> _document_types;
std::shared_ptr<const document::DocumentTypeRepo> _repo;
std::unique_ptr<const document::FieldSetRepo> _field_set_repo;
+ std::shared_ptr<BmDistribution> _real_distribution;
std::shared_ptr<const IBmDistribution> _distribution;
std::vector<std::unique_ptr<BmNode>> _nodes;
std::shared_ptr<BmClusterController> _cluster_controller;
@@ -90,6 +92,8 @@ public:
uint32_t get_num_nodes() const { return _nodes.size(); }
BmNode *get_node(uint32_t node_idx) const { return node_idx < _nodes.size() ? _nodes[node_idx].get() : nullptr; }
std::vector<BmNodeStats> get_node_stats();
+ BmDistribution& get_real_distribution() { return *_real_distribution; }
+ void propagate_cluster_state();
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
index 7766ab6c5b3..2a214130392 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
@@ -12,6 +12,7 @@ BmClusterParams::BmClusterParams()
_enable_service_layer(false),
_indexing_sequencer(),
_num_nodes(1),
+ _redundancy(1),
_response_threads(2), // Same default as in stor-filestor.def
_rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def
_rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def
@@ -42,6 +43,10 @@ BmClusterParams::check() const
std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl;
return false;
}
+ if (_num_nodes < _redundancy) {
+ std::cerr << "Too high redundancy " << _redundancy << " with " << _num_nodes << " nodes" << std::endl;
+ return false;
+ }
return true;
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
index 5bc6b97487c..68dff7f52ed 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
@@ -18,6 +18,7 @@ class BmClusterParams
bool _enable_service_layer;
vespalib::string _indexing_sequencer;
uint32_t _num_nodes;
+ uint32_t _redundancy;
uint32_t _response_threads;
uint32_t _rpc_events_before_wakeup;
uint32_t _rpc_network_threads;
@@ -36,6 +37,7 @@ public:
bool get_enable_distributor() const { return _enable_distributor; }
const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
uint32_t get_num_nodes() const { return _num_nodes; }
+ uint32_t get_redundancy() const { return _redundancy; }
uint32_t get_response_threads() const { return _response_threads; }
uint32_t get_rpc_events_before_wakeup() const { return _rpc_events_before_wakeup; }
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
@@ -55,6 +57,7 @@ public:
void set_enable_service_layer(bool value) { _enable_service_layer = value; }
void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
void set_num_nodes(uint32_t value) { _num_nodes = value; }
+ void set_redundancy(uint32_t value) { _redundancy = value; }
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; }
void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp
index 44b253b3d35..2ab1d4fd7f7 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp
@@ -5,7 +5,12 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <cassert>
+using storage::lib::ClusterState;
using storage::lib::ClusterStateBundle;
+using storage::lib::Node;
+using storage::lib::NodeState;
+using storage::lib::NodeType;
+using storage::lib::State;
namespace search::bmcluster {
@@ -14,7 +19,7 @@ using DistributionConfigBuilder = BmDistribution::DistributionConfigBuilder;
namespace {
BmDistribution::DistributionConfig
-make_distribution_config(uint32_t num_nodes)
+make_distribution_config(uint32_t num_nodes, uint32_t redundancy)
{
DistributionConfigBuilder dc;
{
@@ -32,27 +37,28 @@ make_distribution_config(uint32_t num_nodes)
group.partitions = "";
dc.group.push_back(std::move(group));
}
- dc.redundancy = 1;
- dc.readyCopies = 1;
+ dc.redundancy = redundancy;
+ dc.readyCopies = redundancy;
return dc;
}
-ClusterStateBundle
-make_cluster_state_bundle(uint32_t num_nodes)
+ClusterState
+make_cluster_state(uint32_t num_nodes)
{
vespalib::asciistream s;
s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes;
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str()));
- return bundle;
+ return storage::lib::ClusterState(s.str());
}
}
-BmDistribution::BmDistribution(uint32_t num_nodes)
+BmDistribution::BmDistribution(uint32_t num_nodes, uint32_t redundancy)
: _num_nodes(num_nodes),
- _distribution_config(make_distribution_config(num_nodes)),
+ _distribution_config(make_distribution_config(num_nodes, redundancy)),
_distribution(_distribution_config),
- _cluster_state_bundle(make_cluster_state_bundle(num_nodes))
+ _pending_cluster_state(make_cluster_state(num_nodes)),
+ _cluster_state_bundle(_pending_cluster_state),
+ _has_pending_cluster_state(false)
{
}
@@ -95,4 +101,33 @@ BmDistribution::get_cluster_state_bundle() const
return _cluster_state_bundle;
}
+void
+BmDistribution::set_node_state(uint32_t node_idx, bool distributor, const State& state)
+{
+ const NodeType& node_type = distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE;
+ Node node(node_type, node_idx);
+ NodeState node_state(node_type, state);
+ _pending_cluster_state.setNodeState(node, node_state);
+ if (!_has_pending_cluster_state) {
+ _pending_cluster_state.setVersion(_pending_cluster_state.getVersion() + 1);
+ _has_pending_cluster_state = true;
+ }
+}
+
+void
+BmDistribution::set_node_state(uint32_t node_idx, const State& state)
+{
+ set_node_state(node_idx, false, state);
+ set_node_state(node_idx, true, state);
+}
+
+void
+BmDistribution::commit_cluster_state_change()
+{
+ if (_has_pending_cluster_state) {
+ _cluster_state_bundle = ClusterStateBundle(_pending_cluster_state);
+ _has_pending_cluster_state = false;
+ }
+}
+
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h
index fde89a0d766..6be7592f561 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h
@@ -19,16 +19,20 @@ class BmDistribution : public IBmDistribution
uint32_t _num_nodes;
DistributionConfigBuilder _distribution_config;
storage::lib::Distribution _distribution;
+ storage::lib::ClusterState _pending_cluster_state;
storage::lib::ClusterStateBundle _cluster_state_bundle;
-
+ bool _has_pending_cluster_state;
public:
- BmDistribution(uint32_t num_nodes);
+ BmDistribution(uint32_t num_nodes, uint32_t redundancy);
~BmDistribution() override;
uint32_t get_num_nodes() const override;
uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const override;
uint32_t get_distributor_node_idx(const document::Bucket & bucket) const override;
DistributionConfig get_distribution_config() const override;
storage::lib::ClusterStateBundle get_cluster_state_bundle() const override;
+ void set_node_state(uint32_t node_idx, bool distributor, const storage::lib::State& state);
+ void set_node_state(uint32_t node_idx, const storage::lib::State& state);
+ void commit_cluster_state_change();
};
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
index 4d464923efa..83956dfc274 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp
@@ -16,11 +16,29 @@ using vespalib::makeLambdaTask;
namespace search::bmcluster {
+namespace {
+
+bool steady_buckets_stats(const std::optional<BmBucketsStats> buckets)
+{
+ if (!buckets.has_value()) {
+ return false; // No info available
+ }
+ auto& value = buckets.value();
+ if (!value.get_valid()) {
+ return false; // Some information still missing
+ }
+ return value.get_buckets_pending() == 0u;
+}
+
+}
+
BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster)
: _cluster(cluster),
_executor(1, 128_Ki),
_mutex(),
_cond(),
+ _change_time(),
+ _prev_node_stats(),
_pending_report(1u),
_started(false),
_stop(false)
@@ -99,6 +117,10 @@ BmNodeStatsReporter::report()
}
vespalib::string ss(s.str());
LOG(info, "%s", ss.c_str());
+ if (!(node_stats == _prev_node_stats) || !steady_buckets_stats(total_buckets)) {
+ _change_time = std::chrono::steady_clock::now();
+ _prev_node_stats = node_stats;
+ }
}
void
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
index 356d35474d1..1c8f013f5e6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h
@@ -10,6 +10,7 @@
namespace search::bmcluster {
class BmCluster;
+class BmNodeStats;
/*
* Class handling background reporting of node stats during feed or
@@ -20,6 +21,8 @@ class BmNodeStatsReporter {
vespalib::ThreadStackExecutor _executor;
std::mutex _mutex;
std::condition_variable _cond;
+ std::chrono::time_point<std::chrono::steady_clock> _change_time;
+ std::vector<BmNodeStats> _prev_node_stats;
uint32_t _pending_report;
bool _started;
bool _stop;
@@ -32,6 +35,7 @@ public:
void start(std::chrono::milliseconds interval);
void stop();
void report_now();
+ std::chrono::time_point<std::chrono::steady_clock> get_change_time() const noexcept { return _change_time; }
};
}