aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/apps
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-20 10:57:46 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-20 10:57:46 +0200
commit60bce5566ae5c4fc912afbfd75080baf57fcedb2 (patch)
tree1b03a335b29dc21b45f2f372ba096ff2c0a83958 /searchcore/src/apps
parent304fc2ea70fd82957565416554bfed190353d643 (diff)
Add vespa-redistribute-bm, a single-process benchmark of document redistribution.
Diffstat (limited to 'searchcore/src/apps')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp68
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/.gitignore1
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt8
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp627
4 files changed, 671 insertions, 33 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 7f9ae442dd3..3dac66685f9 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -51,8 +51,6 @@ using search::bmcluster::BmNodeStatsReporter;
using search::bmcluster::BmRange;
using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
-using storage::spi::PersistenceProvider;
-using vespalib::makeLambdaTask;
namespace {
@@ -114,71 +112,73 @@ BMParams::check() const
}
-struct PersistenceProviderFixture {
+class Benchmark {
+ BMParams _params;
std::shared_ptr<const DocumenttypesConfig> _document_types;
std::shared_ptr<const DocumentTypeRepo> _repo;
- std::unique_ptr<BmCluster> _bm_cluster;
+ std::unique_ptr<BmCluster> _cluster;
BmFeed _feed;
- explicit PersistenceProviderFixture(const BMParams& params);
- ~PersistenceProviderFixture();
+ void benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, uint32_t passes, const vespalib::string &op_name);
+public:
+ explicit Benchmark(const BMParams& params);
+ ~Benchmark();
+ void run();
};
-PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
- : _document_types(make_document_types()),
+Benchmark::Benchmark(const BMParams& params)
+ : _params(params),
+ _document_types(make_document_types()),
_repo(document::DocumentTypeRepoFactory::make(*_document_types)),
- _bm_cluster(std::make_unique<BmCluster>(base_dir, base_port, params, _document_types, _repo)),
+ _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)),
_feed(_repo)
{
- _bm_cluster->make_nodes();
+ _cluster->make_nodes();
}
-PersistenceProviderFixture::~PersistenceProviderFixture() = default;
+Benchmark::~Benchmark() = default;
void
-benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params, uint32_t passes, const vespalib::string &op_name)
+Benchmark::benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, uint32_t passes, const vespalib::string &op_name)
{
if (passes == 0) {
return;
}
AvgSampler sampler;
LOG(info, "--------------------------------");
- LOG(info, "%sAsync: %u small documents, passes=%u", op_name.c_str(), params.get_documents(), passes);
+ LOG(info, "%sAsync: %u small documents, passes=%u", op_name.c_str(), _params.get_documents(), passes);
for (uint32_t pass = 0; pass < passes; ++pass) {
- feeder.run_feed_tasks(pass, time_bias, serialized_feed, params, sampler, op_name);
+ feeder.run_feed_tasks(pass, time_bias, serialized_feed, _params, sampler, op_name);
}
LOG(info, "%sAsync: AVG %s/s: %8.2f", op_name.c_str(), op_name.c_str(), sampler.avg());
}
-void benchmark(const BMParams &bm_params)
+void
+Benchmark::run()
{
- vespalib::rmdir(base_dir, true);
- PersistenceProviderFixture f(bm_params);
- auto& cluster = *f._bm_cluster;
- cluster.start(f._feed);
- vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki);
- BmFeeder feeder(f._repo, *cluster.get_feed_handler(), executor);
- auto& feed = f._feed;
- auto put_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put");
- auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
- auto get_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_get_feed(range, bucket_selector); }, f._feed.num_buckets(), "get");
- auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
- BmNodeStatsReporter reporter(cluster);
+ _cluster->start(_feed);
+ vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
+ BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
+ auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
+ auto update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update");
+ auto get_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_get_feed(range, bucket_selector); }, _feed.num_buckets(), "get");
+ auto remove_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_remove_feed(range, bucket_selector); }, _feed.num_buckets(), "remove");
+ BmNodeStatsReporter reporter(*_cluster);
reporter.start(500ms);
int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
- benchmark_feed(feeder, time_bias, put_feed, bm_params, bm_params.get_put_passes(), "put");
+ benchmark_feed(feeder, time_bias, put_feed, _params.get_put_passes(), "put");
reporter.report_now();
- benchmark_feed(feeder, time_bias, update_feed, bm_params, bm_params.get_update_passes(), "update");
+ benchmark_feed(feeder, time_bias, update_feed, _params.get_update_passes(), "update");
reporter.report_now();
- benchmark_feed(feeder, time_bias, get_feed, bm_params, bm_params.get_get_passes(), "get");
+ benchmark_feed(feeder, time_bias, get_feed, _params.get_get_passes(), "get");
reporter.report_now();
- benchmark_feed(feeder, time_bias, remove_feed, bm_params, bm_params.get_remove_passes(), "remove");
+ benchmark_feed(feeder, time_bias, remove_feed, _params.get_remove_passes(), "remove");
reporter.report_now();
reporter.stop();
LOG(info, "--------------------------------");
- cluster.stop();
+ _cluster->stop();
}
class App : public FastOS_Application
@@ -382,7 +382,9 @@ App::Main()
usage();
return 1;
}
- benchmark(_bm_params);
+ vespalib::rmdir(base_dir, true);
+ Benchmark bm(_bm_params);
+ bm.run();
return 0;
}
diff --git a/searchcore/src/apps/vespa-redistribute-bm/.gitignore b/searchcore/src/apps/vespa-redistribute-bm/.gitignore
new file mode 100644
index 00000000000..4a7424f7ef4
--- /dev/null
+++ b/searchcore/src/apps/vespa-redistribute-bm/.gitignore
@@ -0,0 +1 @@
+vespa-redistribute-bm
diff --git a/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt b/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt
new file mode 100644
index 00000000000..5b34c1aefb8
--- /dev/null
+++ b/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_vespa_redistribute_bm_app
+ SOURCES
+ vespa_redistribute_bm.cpp
+ OUTPUT_NAME vespa-redistribute-bm
+ DEPENDS
+ searchcore_bmcluster
+)
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
new file mode 100644
index 00000000000..34cd091b615
--- /dev/null
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -0,0 +1,627 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/repo/configbuilder.h>
+#include <vespa/document/repo/document_type_repo_factory.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/fastos/app.h>
+#include <vespa/searchcore/bmcluster/avg_sampler.h>
+#include <vespa/searchcore/bmcluster/bm_cluster.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_controller.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_params.h>
+#include <vespa/searchcore/bmcluster/bm_distribution.h>
+#include <vespa/searchcore/bmcluster/bm_feed.h>
+#include <vespa/searchcore/bmcluster/bm_feeder.h>
+#include <vespa/searchcore/bmcluster/bm_feed_params.h>
+#include <vespa/searchcore/bmcluster/bm_node.h>
+#include <vespa/searchcore/bmcluster/bm_node_stats.h>
+#include <vespa/searchcore/bmcluster/bm_node_stats_reporter.h>
+#include <vespa/searchcore/bmcluster/bm_range.h>
+#include <vespa/searchcore/bmcluster/bucket_selector.h>
+#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h>
+#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <getopt.h>
+#include <iostream>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP("vespa-redistribute-bm");
+
+using namespace proton;
+using namespace std::chrono_literals;
+
+using document::DocumentTypeRepo;
+using document::DocumentTypeRepoFactory;
+using document::DocumenttypesConfig;
+using document::DocumenttypesConfigBuilder;
+using search::bmcluster::AvgSampler;
+using search::bmcluster::BmClusterController;
+using search::bmcluster::IBmFeedHandler;
+using search::bmcluster::BmClusterParams;
+using search::bmcluster::BmCluster;
+using search::bmcluster::BmFeed;
+using search::bmcluster::BmFeedParams;
+using search::bmcluster::BmFeeder;
+using search::bmcluster::BmNode;
+using search::bmcluster::BmNodeStatsReporter;
+using search::bmcluster::BmRange;
+using search::bmcluster::BucketSelector;
+using search::index::DummyFileHeaderContext;
+using storage::lib::State;
+
+namespace {
+
+vespalib::string base_dir = "testdb";
+constexpr int base_port = 9017;
+
+std::shared_ptr<DocumenttypesConfig> make_document_types() {
+ using Struct = document::config_builder::Struct;
+ using DataType = document::DataType;
+ document::config_builder::DocumenttypesConfigBuilderHelper builder;
+ builder.document(42, "test", Struct("test.header").addField("int", DataType::T_INT), Struct("test.body"));
+ return std::make_shared<DocumenttypesConfig>(builder.config());
+}
+
+enum class Mode {
+ GROW,
+ SHRINK,
+ PERM_CRASH,
+ TEMP_CRASH,
+ REPLACE,
+ BAD,
+};
+
+std::vector<vespalib::string> mode_names = {
+ "grow",
+ "shrink",
+ "perm-crash",
+ "temp-crash",
+ "replace"
+};
+
+vespalib::string bad_mode_name("bad");
+
+Mode get_mode(const vespalib::string& mode_name) {
+ for (uint32_t i = 0; i < mode_names.size(); ++i) {
+ if (mode_name == mode_names[i]) {
+ return static_cast<Mode>(i);
+ }
+ }
+ return Mode::BAD;
+}
+
+vespalib::string& get_mode_name(Mode mode) {
+ uint32_t i = static_cast<uint32_t>(mode);
+ return (i < mode_names.size()) ? mode_names[i] : bad_mode_name;
+}
+
+double
+estimate_lost_docs_base_ratio(uint32_t redundancy, uint32_t lost_nodes, uint32_t num_nodes)
+{
+ if (redundancy > lost_nodes) {
+ return 0.0;
+ }
+ double loss_ratio = 1.0;
+ for (uint32_t i = 0; i < redundancy; ++i) {
+ loss_ratio *= ((double) (lost_nodes - i)) / (num_nodes - i);
+ }
+ LOG(info, "estimated lost docs base ratio: %4.2f", loss_ratio);
+ return loss_ratio;
+}
+
+double
+estimate_moved_docs_ratio_grow(uint32_t redundancy, uint32_t added_nodes, uint32_t num_nodes)
+{
+ double new_redundancy = redundancy;
+ double new_per_node_doc_ratio = new_redundancy / num_nodes;
+ double moved_ratio = new_per_node_doc_ratio * added_nodes;
+ LOG(info, "estimated_moved_docs_ratio_grow(%u,%u,%u)=%4.2f", redundancy, added_nodes, num_nodes, moved_ratio);
+ return moved_ratio;
+}
+
+double
+estimate_moved_docs_ratio_shrink(uint32_t redundancy, uint32_t retired_nodes, uint32_t num_nodes)
+{
+ double old_redundancy = redundancy;
+ double old_per_node_doc_ratio = old_redundancy / num_nodes;
+ uint32_t new_nodes = num_nodes - retired_nodes;
+ double new_redundancy = std::min(redundancy, new_nodes);
+ double new_per_node_doc_ratio = new_redundancy / new_nodes;
+ double moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * new_nodes;
+ LOG(info, "estimated_moved_docs_ratio_shrink(%u,%u,%u)=%4.2f", redundancy, retired_nodes, num_nodes, moved_ratio);
+ return moved_ratio;
+}
+
+double
+estimate_moved_nodes_ratio_crash(uint32_t redundancy, uint32_t crashed_nodes, uint32_t num_nodes)
+{
+ double old_redundancy = redundancy;
+ double old_per_node_doc_ratio = old_redundancy / num_nodes;
+ uint32_t new_nodes = num_nodes - crashed_nodes;
+ double new_redundancy = std::min(redundancy, new_nodes);
+ double new_per_node_doc_ratio = new_redundancy / new_nodes;
+ double lost_docs_ratio = estimate_lost_docs_base_ratio(redundancy, crashed_nodes, num_nodes) * new_redundancy;
+ double moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * new_nodes - lost_docs_ratio;
+ LOG(info, "estimated_moved_docs_ratio_crash(%u,%u,%u)=%4.2f", redundancy, crashed_nodes, num_nodes, moved_ratio);
+ return moved_ratio;
+}
+
+double
+estimate_moved_nodes_ratio_replace(uint32_t redundancy, uint32_t added_nodes, uint32_t retired_nodes, uint32_t num_nodes)
+{
+ uint32_t old_nodes = num_nodes - added_nodes;
+ double old_redundancy = std::min(redundancy, old_nodes);
+ double old_per_node_doc_ratio = old_redundancy / old_nodes;
+ uint32_t new_nodes = num_nodes - retired_nodes;
+ double new_redundancy = std::min(redundancy, new_nodes);
+ double new_per_node_doc_ratio = new_redundancy / new_nodes;
+ double moved_ratio = new_per_node_doc_ratio * added_nodes;
+ uint32_t stable_nodes = num_nodes - added_nodes - retired_nodes;
+ // Account for extra documents moved from retired nodes to stable nodes
+ // TODO: Fix calculation
+ double extra_moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * stable_nodes;
+ extra_moved_ratio += new_per_node_doc_ratio * stable_nodes * retired_nodes / new_nodes;
+ moved_ratio += extra_moved_ratio;
+ LOG(info, "estimated_moved_docs_ratio_replace(%u,%u,%u,%u)=%4.2f, (of which %4.2f extra)", redundancy, added_nodes, retired_nodes, num_nodes, moved_ratio, extra_moved_ratio);
+ return moved_ratio;
+}
+
+class BMParams : public BmClusterParams,
+ public BmFeedParams
+{
+ uint32_t _flip_nodes;
+ Mode _mode;
+ bool _use_feed_settle;
+public:
+ BMParams();
+ uint32_t get_flip_nodes() const noexcept { return _flip_nodes; }
+ Mode get_mode() const noexcept { return _mode; }
+ bool get_use_feed_settle() const noexcept { return _use_feed_settle; }
+ void set_flip_nodes(uint32_t value) { _flip_nodes = value; }
+ void set_mode(Mode value) { _mode = value; }
+ void set_use_feed_settle(bool value) { _use_feed_settle = value; }
+ bool check() const;
+};
+
+BMParams::BMParams()
+ : BmClusterParams(),
+ BmFeedParams(),
+ _flip_nodes(1u),
+ _mode(Mode::GROW),
+ _use_feed_settle(false)
+{
+ set_enable_service_layer(true);
+ set_enable_distributor(true);
+ set_use_document_api(true);
+ set_num_nodes(4);
+}
+
+
+bool
+BMParams::check() const
+{
+ if (!BmClusterParams::check()) {
+ return false;
+ }
+ if (!BmFeedParams::check()) {
+ return false;
+ }
+ if (get_num_nodes() < 2u) {
+ std::cerr << "Too few nodes: " << get_num_nodes() << std::endl;
+ return false;
+ }
+ if (_mode == Mode::REPLACE) {
+ if (_flip_nodes * 2 > get_num_nodes()) {
+ std::cerr << "Too many flip nodes (" << _flip_nodes << ") with " << get_num_nodes() << " nodes (replace mode)" << std::endl;
+ return false;
+ }
+ } else {
+ if (_flip_nodes >= get_num_nodes()) {
+ std::cerr << "Too many flip nodes (" << _flip_nodes << ") with " << get_num_nodes() << " nodes (" << get_mode_name(_mode) << " mode)" << std::endl;
+ return false;
+ }
+ }
+ if (_mode == Mode::BAD) {
+ std::cerr << "Bad mode" << std::endl;
+ return false;
+ }
+ return true;
+}
+
+}
+
+class Benchmark {
+ BMParams _params;
+ std::shared_ptr<const DocumenttypesConfig> _document_types;
+ std::shared_ptr<const DocumentTypeRepo> _repo;
+ std::unique_ptr<BmCluster> _cluster;
+ BmFeed _feed;
+
+ void adjust_cluster_state_before_feed();
+ void adjust_cluster_state_after_feed();
+ void adjust_cluster_state_after_first_redistribution();
+ double estimate_lost_docs();
+ double estimate_moved_docs();
+ void feed();
+ std::chrono::duration<double> redistribute();
+
+public:
+ explicit Benchmark(const BMParams& params);
+ ~Benchmark();
+ void run();
+};
+
+Benchmark::Benchmark(const BMParams& params)
+ : _params(params),
+ _document_types(make_document_types()),
+ _repo(document::DocumentTypeRepoFactory::make(*_document_types)),
+ _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)),
+ _feed(_repo)
+{
+ _cluster->make_nodes();
+}
+
+Benchmark::~Benchmark() = default;
+
+void
+Benchmark::adjust_cluster_state_before_feed()
+{
+ auto& dist = _cluster->get_real_distribution();
+ auto& mode_name = get_mode_name(_params.get_mode());
+ switch (_params.get_mode()) {
+ case Mode::GROW:
+ case Mode::REPLACE:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::DOWN);
+ }
+ LOG(info, "Mode %s: Taking down %u node(s) initially", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ default:
+ LOG(info, "Mode %s: No cluster state adjust before feed", mode_name.c_str());
+ }
+ dist.commit_cluster_state_change();
+}
+
+void
+Benchmark::adjust_cluster_state_after_feed()
+{
+ auto& dist = _cluster->get_real_distribution();
+ auto& mode_name = get_mode_name(_params.get_mode());
+ switch (_params.get_mode()) {
+ case Mode::GROW:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::UP);
+ }
+ LOG(info, "Mode %s: taking up %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ case Mode::SHRINK:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::RETIRED);
+ }
+ LOG(info, "Mode %s: Retiring %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ case Mode::PERM_CRASH:
+ case Mode::TEMP_CRASH:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::DOWN);
+ }
+ LOG(info, "Mode %s: taking down %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ case Mode::REPLACE:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::UP);
+ }
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i + _params.get_flip_nodes(), State::RETIRED);
+ }
+ LOG(info, "Mode %s: Taking up %u node(s) and retiring %u node(s)", mode_name.c_str(), _params.get_flip_nodes(), _params.get_flip_nodes());
+ break;
+ default:
+ LOG(info, "Mode %s: No cluster state adjust after feed", mode_name.c_str());
+ }
+ dist.commit_cluster_state_change();
+}
+
+void
+Benchmark::adjust_cluster_state_after_first_redistribution()
+{
+ auto& dist = _cluster->get_real_distribution();
+ auto& mode_name = get_mode_name(_params.get_mode());
+ switch (_params.get_mode()) {
+ case Mode::TEMP_CRASH:
+ for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) {
+ dist.set_node_state(i, State::UP);
+ }
+ LOG(info, "Mode %s: taking up %u node(s)", mode_name.c_str(), _params.get_flip_nodes());
+ break;
+ default:
+ LOG(info, "Mode %s: No cluster state adjust after first redistribution", mode_name.c_str());
+ }
+ dist.commit_cluster_state_change();
+}
+
+void
+Benchmark::feed()
+{
+ vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
+ BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
+ auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
+ BmNodeStatsReporter reporter(*_cluster);
+ reporter.start(500ms);
+ int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
+ LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
+ AvgSampler sampler;
+ feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put");
+ reporter.report_now();
+ if (_params.get_use_feed_settle()) {
+ LOG(info, "Settling feed");
+ std::this_thread::sleep_for(2s);
+ reporter.report_now();
+ }
+}
+
+
+std::chrono::duration<double>
+Benchmark::redistribute()
+{
+ BmNodeStatsReporter reporter(*_cluster);
+ auto before = std::chrono::steady_clock::now();
+ reporter.start(500ms);
+ _cluster->propagate_cluster_state();
+ reporter.report_now();
+ for (;;) {
+ auto duration = std::chrono::steady_clock::now() - reporter.get_change_time();
+ if (duration >= 6s) {
+ break;
+ }
+ std::this_thread::sleep_for(100ms);
+ }
+ return reporter.get_change_time() - before;
+}
+
+double
+Benchmark::estimate_lost_docs()
+{
+ switch (_params.get_mode()) {
+ case Mode::PERM_CRASH:
+ case Mode::TEMP_CRASH:
+ {
+ double new_redundancy = std::min(_params.get_redundancy(), _params.get_num_nodes() - _params.get_flip_nodes());
+ auto lost_docs_ratio = estimate_lost_docs_base_ratio(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes()) * new_redundancy;
+ return _params.get_documents() * lost_docs_ratio;
+ }
+ default:
+ return 0.0;
+ }
+}
+
+double
+Benchmark::estimate_moved_docs()
+{
+ switch(_params.get_mode()) {
+ case Mode::GROW:
+ return _params.get_documents() * estimate_moved_docs_ratio_grow(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes());
+ case Mode::SHRINK:
+ return _params.get_documents() * estimate_moved_docs_ratio_shrink(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes());
+ case Mode::PERM_CRASH:
+ case Mode::TEMP_CRASH:
+ return _params.get_documents() * estimate_moved_nodes_ratio_crash(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes());
+ case Mode::REPLACE:
+ return _params.get_documents() * estimate_moved_nodes_ratio_replace(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_flip_nodes(), _params.get_num_nodes());
+ default:
+ return 0.0;
+ }
+}
+
+void
+Benchmark::run()
+{
+ adjust_cluster_state_before_feed();
+ _cluster->start(_feed);
+ feed();
+ LOG(info, "--------------------------------");
+ adjust_cluster_state_after_feed();
+ auto elapsed = redistribute();
+ double moved_docs = estimate_moved_docs();
+ double lost_docs = estimate_lost_docs();
+ LOG(info, "Redistributed estimated %4.2f docs in %5.3f seconds, %4.2f docs/s, estimated %4.2f lost docs", moved_docs, elapsed.count(), moved_docs / elapsed.count(), lost_docs);
+ if (_params.get_mode() == Mode::TEMP_CRASH) {
+ if (_params.get_use_feed_settle()) {
+ LOG(info, "Settling redistribution");
+ std::this_thread::sleep_for(2s);
+ }
+ adjust_cluster_state_after_first_redistribution();
+ elapsed = redistribute();
+ LOG(info, "Cleanup of %4.2f docs in %5.3f seconds, %4.2f docs/s, estimated %4.2f refound docs", moved_docs, elapsed.count(), moved_docs / elapsed.count(), lost_docs);
+ }
+ _cluster->stop();
+}
+
+class App : public FastOS_Application
+{
+ BMParams _bm_params;
+public:
+ App();
+ ~App() override;
+ void usage();
+ bool get_options();
+ int Main() override;
+};
+
+App::App()
+ : _bm_params()
+{
+}
+
+App::~App() = default;
+
+void
+App::usage()
+{
+ std::cerr <<
+ "vespa-redistribute-bm version 0.0\n"
+ "\n"
+ "USAGE:\n";
+ std::cerr <<
+ "vespa-redistribute-bm\n"
+ "[--bucket-db-stripe-bits bits]\n"
+ "[--client-threads threads]\n"
+ "[--distributor-stripes stripes]\n"
+ "[--documents documents]\n"
+ "[--flip-nodes flip-nodes]\n"
+ "[--indexing-sequencer [latency,throughput,adaptive]]\n"
+ "[--max-pending max-pending]\n"
+ "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n"
+ "[--nodes nodes]\n"
+ "[--redundancy redundancy]\n"
+ "[--rpc-events-before-wakeup events]\n"
+ "[--rpc-network-threads threads]\n"
+ "[--rpc-targets-per-node targets]\n"
+ "[--response-threads threads]\n"
+ "[--skip-communicationmanager-thread]\n"
+ "[--use-async-message-handling]\n"
+ "[--use-feed-settle]" << std::endl;
+}
+
+bool
+App::get_options()
+{
+ int c;
+ const char *opt_argument = nullptr;
+ int long_opt_index = 0;
+ static struct option long_opts[] = {
+ { "bucket-db-stripe-bits", 1, nullptr, 0 },
+ { "client-threads", 1, nullptr, 0 },
+ { "distributor-stripes", 1, nullptr, 0 },
+ { "documents", 1, nullptr, 0 },
+ { "flip-nodes", 1, nullptr, 0 },
+ { "indexing-sequencer", 1, nullptr, 0 },
+ { "max-pending", 1, nullptr, 0 },
+ { "mode", 1, nullptr, 0 },
+ { "nodes", 1, nullptr, 0 },
+ { "redundancy", 1, nullptr, 0 },
+ { "response-threads", 1, nullptr, 0 },
+ { "rpc-events-before-wakeup", 1, nullptr, 0 },
+ { "rpc-network-threads", 1, nullptr, 0 },
+ { "rpc-targets-per-node", 1, nullptr, 0 },
+ { "skip-communicationmanager-thread", 0, nullptr, 0 },
+ { "use-async-message-handling", 0, nullptr, 0 },
+ { "use-feed-settle", 0, nullptr, 0 }
+ };
+ enum longopts_enum {
+ LONGOPT_BUCKET_DB_STRIPE_BITS,
+ LONGOPT_CLIENT_THREADS,
+ LONGOPT_DISTRIBUTOR_STRIPES,
+ LONGOPT_DOCUMENTS,
+ LONGOPT_FLIP_NODES,
+ LONGOPT_INDEXING_SEQUENCER,
+ LONGOPT_MAX_PENDING,
+ LONGOPT_MODE,
+ LONGOPT_NODES,
+ LONGOPT_REDUNDANCY,
+ LONGOPT_RESPONSE_THREADS,
+ LONGOPT_RPC_EVENTS_BEFORE_WAKEUP,
+ LONGOPT_RPC_NETWORK_THREADS,
+ LONGOPT_RPC_TARGETS_PER_NODE,
+ LONGOPT_SKIP_COMMUNICATIONMANAGER_THREAD,
+ LONGOPT_USE_ASYNC_MESSAGE_HANDLING,
+ LONGOPT_USE_FEED_SETTLE
+ };
+ int opt_index = 1;
+ resetOptIndex(opt_index);
+ while ((c = GetOptLong("", opt_argument, opt_index, long_opts, &long_opt_index)) != -1) {
+ switch (c) {
+ case 0:
+ switch(long_opt_index) {
+ case LONGOPT_BUCKET_DB_STRIPE_BITS:
+ _bm_params.set_bucket_db_stripe_bits(atoi(opt_argument));
+ break;
+ case LONGOPT_CLIENT_THREADS:
+ _bm_params.set_client_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_DISTRIBUTOR_STRIPES:
+ _bm_params.set_distributor_stripes(atoi(opt_argument));
+ break;
+ case LONGOPT_DOCUMENTS:
+ _bm_params.set_documents(atoi(opt_argument));
+ break;
+ case LONGOPT_FLIP_NODES:
+ _bm_params.set_flip_nodes(atoi(opt_argument));
+ break;
+ case LONGOPT_INDEXING_SEQUENCER:
+ _bm_params.set_indexing_sequencer(opt_argument);
+ break;
+ case LONGOPT_MAX_PENDING:
+ _bm_params.set_max_pending(atoi(opt_argument));
+ break;
+ case LONGOPT_MODE:
+ _bm_params.set_mode(get_mode(opt_argument));
+ if (_bm_params.get_mode() == Mode::BAD) {
+ std::cerr << "Unknown mode name " << opt_argument << std::endl;
+ }
+ break;
+ case LONGOPT_NODES:
+ _bm_params.set_num_nodes(atoi(opt_argument));
+ break;
+ case LONGOPT_REDUNDANCY:
+ _bm_params.set_redundancy(atoi(opt_argument));
+ break;
+ case LONGOPT_RESPONSE_THREADS:
+ _bm_params.set_response_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_RPC_EVENTS_BEFORE_WAKEUP:
+ _bm_params.set_rpc_events_before_wakeup(atoi(opt_argument));
+ break;
+ case LONGOPT_RPC_NETWORK_THREADS:
+ _bm_params.set_rpc_network_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_RPC_TARGETS_PER_NODE:
+ _bm_params.set_rpc_targets_per_node(atoi(opt_argument));
+ break;
+ case LONGOPT_SKIP_COMMUNICATIONMANAGER_THREAD:
+ _bm_params.set_skip_communicationmanager_thread(true);
+ break;
+ case LONGOPT_USE_ASYNC_MESSAGE_HANDLING:
+ _bm_params.set_use_async_message_handling_on_schedule(true);
+ break;
+ case LONGOPT_USE_FEED_SETTLE:
+ _bm_params.set_use_feed_settle(true);
+ break;
+ default:
+ return false;
+ }
+ break;
+ default:
+ return false;
+ }
+ }
+ return _bm_params.check();
+}
+
+int
+App::Main()
+{
+ if (!get_options()) {
+ usage();
+ return 1;
+ }
+ vespalib::rmdir(base_dir, true);
+ Benchmark bm(_bm_params);
+ bm.run();
+ return 0;
+}
+
+int
+main(int argc, char* argv[])
+{
+ DummyFileHeaderContext::setCreator("vespa-redistribute-bm");
+ App app;
+ auto exit_value = app.Entry(argc, argv);
+ vespalib::rmdir(base_dir, true);
+ return exit_value;
+}