diff options
13 files changed, 769 insertions, 46 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index c76f35bd9ff..c0353747e80 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -50,6 +50,7 @@ vespa_define_module( src/apps/vespa-feed-bm src/apps/vespa-gen-testdocs src/apps/vespa-proton-cmd + src/apps/vespa-redistribute-bm src/apps/vespa-transactionlog-inspect TESTS diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 7f9ae442dd3..3dac66685f9 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -51,8 +51,6 @@ using search::bmcluster::BmNodeStatsReporter; using search::bmcluster::BmRange; using search::bmcluster::BucketSelector; using search::index::DummyFileHeaderContext; -using storage::spi::PersistenceProvider; -using vespalib::makeLambdaTask; namespace { @@ -114,71 +112,73 @@ BMParams::check() const } -struct PersistenceProviderFixture { +class Benchmark { + BMParams _params; std::shared_ptr<const DocumenttypesConfig> _document_types; std::shared_ptr<const DocumentTypeRepo> _repo; - std::unique_ptr<BmCluster> _bm_cluster; + std::unique_ptr<BmCluster> _cluster; BmFeed _feed; - explicit PersistenceProviderFixture(const BMParams& params); - ~PersistenceProviderFixture(); + void benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, uint32_t passes, const vespalib::string &op_name); +public: + explicit Benchmark(const BMParams& params); + ~Benchmark(); + void run(); }; -PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) - : _document_types(make_document_types()), +Benchmark::Benchmark(const BMParams& params) + : _params(params), + _document_types(make_document_types()), _repo(document::DocumentTypeRepoFactory::make(*_document_types)), - _bm_cluster(std::make_unique<BmCluster>(base_dir, base_port, params, _document_types, _repo)), + _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)), _feed(_repo) { - _bm_cluster->make_nodes(); + _cluster->make_nodes(); } -PersistenceProviderFixture::~PersistenceProviderFixture() = default; +Benchmark::~Benchmark() = default; void -benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params, uint32_t passes, const vespalib::string &op_name) +Benchmark::benchmark_feed(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, uint32_t passes, const vespalib::string &op_name) { if (passes == 0) { return; } AvgSampler sampler; LOG(info, "--------------------------------"); - LOG(info, "%sAsync: %u small documents, passes=%u", op_name.c_str(), params.get_documents(), passes); + LOG(info, "%sAsync: %u small documents, passes=%u", op_name.c_str(), _params.get_documents(), passes); for (uint32_t pass = 0; pass < passes; ++pass) { - feeder.run_feed_tasks(pass, time_bias, serialized_feed, params, sampler, op_name); + feeder.run_feed_tasks(pass, time_bias, serialized_feed, _params, sampler, op_name); } LOG(info, "%sAsync: AVG %s/s: %8.2f", op_name.c_str(), op_name.c_str(), sampler.avg()); } -void benchmark(const BMParams &bm_params) +void +Benchmark::run() { - vespalib::rmdir(base_dir, true); - PersistenceProviderFixture f(bm_params); - auto& cluster = *f._bm_cluster; - cluster.start(f._feed); - vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki); - BmFeeder feeder(f._repo, *cluster.get_feed_handler(), executor); - auto& feed = f._feed; - auto put_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put"); - auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update"); - auto get_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_get_feed(range, bucket_selector); }, f._feed.num_buckets(), "get"); - auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove"); - BmNodeStatsReporter reporter(cluster); + _cluster->start(_feed); + vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki); + BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor); + auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put"); + auto update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update"); + auto get_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_get_feed(range, bucket_selector); }, _feed.num_buckets(), "get"); + auto remove_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_remove_feed(range, bucket_selector); }, _feed.num_buckets(), "remove"); + BmNodeStatsReporter reporter(*_cluster); reporter.start(500ms); int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count(); LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str()); - benchmark_feed(feeder, time_bias, put_feed, bm_params, bm_params.get_put_passes(), "put"); + benchmark_feed(feeder, time_bias, put_feed, _params.get_put_passes(), "put"); reporter.report_now(); - benchmark_feed(feeder, time_bias, update_feed, bm_params, bm_params.get_update_passes(), "update"); + benchmark_feed(feeder, time_bias, update_feed, _params.get_update_passes(), "update"); reporter.report_now(); - benchmark_feed(feeder, time_bias, get_feed, bm_params, bm_params.get_get_passes(), "get"); + benchmark_feed(feeder, time_bias, get_feed, _params.get_get_passes(), "get"); reporter.report_now(); - benchmark_feed(feeder, time_bias, remove_feed, bm_params, bm_params.get_remove_passes(), "remove"); + benchmark_feed(feeder, time_bias, remove_feed, _params.get_remove_passes(), "remove"); reporter.report_now(); reporter.stop(); LOG(info, "--------------------------------"); - cluster.stop(); + _cluster->stop(); } class App : public FastOS_Application @@ -382,7 +382,9 @@ App::Main() usage(); return 1; } - benchmark(_bm_params); + vespalib::rmdir(base_dir, true); + Benchmark bm(_bm_params); + bm.run(); return 0; } diff --git a/searchcore/src/apps/vespa-redistribute-bm/.gitignore b/searchcore/src/apps/vespa-redistribute-bm/.gitignore new file mode 100644 index 00000000000..4a7424f7ef4 --- /dev/null +++ b/searchcore/src/apps/vespa-redistribute-bm/.gitignore @@ -0,0 +1 @@ +vespa-redistribute-bm diff --git a/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt b/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt new file mode 100644 index 00000000000..5b34c1aefb8 --- /dev/null +++ b/searchcore/src/apps/vespa-redistribute-bm/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcore_vespa_redistribute_bm_app + SOURCES + vespa_redistribute_bm.cpp + OUTPUT_NAME vespa-redistribute-bm + DEPENDS + searchcore_bmcluster +) diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp new file mode 100644 index 00000000000..34cd091b615 --- /dev/null +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -0,0 +1,627 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/document/repo/configbuilder.h> +#include <vespa/document/repo/document_type_repo_factory.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/fastos/app.h> +#include <vespa/searchcore/bmcluster/avg_sampler.h> +#include <vespa/searchcore/bmcluster/bm_cluster.h> +#include <vespa/searchcore/bmcluster/bm_cluster_controller.h> +#include <vespa/searchcore/bmcluster/bm_cluster_params.h> +#include <vespa/searchcore/bmcluster/bm_distribution.h> +#include <vespa/searchcore/bmcluster/bm_feed.h> +#include <vespa/searchcore/bmcluster/bm_feeder.h> +#include <vespa/searchcore/bmcluster/bm_feed_params.h> +#include <vespa/searchcore/bmcluster/bm_node.h> +#include <vespa/searchcore/bmcluster/bm_node_stats.h> +#include <vespa/searchcore/bmcluster/bm_node_stats_reporter.h> +#include <vespa/searchcore/bmcluster/bm_range.h> +#include <vespa/searchcore/bmcluster/bucket_selector.h> +#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h> +#include <vespa/searchlib/index/dummyfileheadercontext.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <getopt.h> +#include <iostream> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP("vespa-redistribute-bm"); + +using namespace proton; +using namespace std::chrono_literals; + +using document::DocumentTypeRepo; +using document::DocumentTypeRepoFactory; +using document::DocumenttypesConfig; +using document::DocumenttypesConfigBuilder; +using search::bmcluster::AvgSampler; +using search::bmcluster::BmClusterController; +using search::bmcluster::IBmFeedHandler; +using search::bmcluster::BmClusterParams; +using search::bmcluster::BmCluster; +using search::bmcluster::BmFeed; +using search::bmcluster::BmFeedParams; +using search::bmcluster::BmFeeder; +using search::bmcluster::BmNode; +using search::bmcluster::BmNodeStatsReporter; +using search::bmcluster::BmRange; +using search::bmcluster::BucketSelector; +using search::index::DummyFileHeaderContext; +using storage::lib::State; + +namespace { + +vespalib::string base_dir = "testdb"; +constexpr int base_port = 9017; + +std::shared_ptr<DocumenttypesConfig> make_document_types() { + using Struct = document::config_builder::Struct; + using DataType = document::DataType; + document::config_builder::DocumenttypesConfigBuilderHelper builder; + builder.document(42, "test", Struct("test.header").addField("int", DataType::T_INT), Struct("test.body")); + return std::make_shared<DocumenttypesConfig>(builder.config()); +} + +enum class Mode { + GROW, + SHRINK, + PERM_CRASH, + TEMP_CRASH, + REPLACE, + BAD, +}; + +std::vector<vespalib::string> mode_names = { + "grow", + "shrink", + "perm-crash", + "temp-crash", + "replace" +}; + +vespalib::string bad_mode_name("bad"); + +Mode get_mode(const vespalib::string& mode_name) { + for (uint32_t i = 0; i < mode_names.size(); ++i) { + if (mode_name == mode_names[i]) { + return static_cast<Mode>(i); + } + } + return Mode::BAD; +} + +vespalib::string& get_mode_name(Mode mode) { + uint32_t i = static_cast<uint32_t>(mode); + return (i < mode_names.size()) ? mode_names[i] : bad_mode_name; +} + +double +estimate_lost_docs_base_ratio(uint32_t redundancy, uint32_t lost_nodes, uint32_t num_nodes) +{ + if (redundancy > lost_nodes) { + return 0.0; + } + double loss_ratio = 1.0; + for (uint32_t i = 0; i < redundancy; ++i) { + loss_ratio *= ((double) (lost_nodes - i)) / (num_nodes - i); + } + LOG(info, "estimated lost docs base ratio: %4.2f", loss_ratio); + return loss_ratio; +} + +double +estimate_moved_docs_ratio_grow(uint32_t redundancy, uint32_t added_nodes, uint32_t num_nodes) +{ + double new_redundancy = redundancy; + double new_per_node_doc_ratio = new_redundancy / num_nodes; + double moved_ratio = new_per_node_doc_ratio * added_nodes; + LOG(info, "estimated_moved_docs_ratio_grow(%u,%u,%u)=%4.2f", redundancy, added_nodes, num_nodes, moved_ratio); + return moved_ratio; +} + +double +estimate_moved_docs_ratio_shrink(uint32_t redundancy, uint32_t retired_nodes, uint32_t num_nodes) +{ + double old_redundancy = redundancy; + double old_per_node_doc_ratio = old_redundancy / num_nodes; + uint32_t new_nodes = num_nodes - retired_nodes; + double new_redundancy = std::min(redundancy, new_nodes); + double new_per_node_doc_ratio = new_redundancy / new_nodes; + double moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * new_nodes; + LOG(info, "estimated_moved_docs_ratio_shrink(%u,%u,%u)=%4.2f", redundancy, retired_nodes, num_nodes, moved_ratio); + return moved_ratio; +} + +double +estimate_moved_nodes_ratio_crash(uint32_t redundancy, uint32_t crashed_nodes, uint32_t num_nodes) +{ + double old_redundancy = redundancy; + double old_per_node_doc_ratio = old_redundancy / num_nodes; + uint32_t new_nodes = num_nodes - crashed_nodes; + double new_redundancy = std::min(redundancy, new_nodes); + double new_per_node_doc_ratio = new_redundancy / new_nodes; + double lost_docs_ratio = estimate_lost_docs_base_ratio(redundancy, crashed_nodes, num_nodes) * new_redundancy; + double moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * new_nodes - lost_docs_ratio; + LOG(info, "estimated_moved_docs_ratio_crash(%u,%u,%u)=%4.2f", redundancy, crashed_nodes, num_nodes, moved_ratio); + return moved_ratio; +} + +double +estimate_moved_nodes_ratio_replace(uint32_t redundancy, uint32_t added_nodes, uint32_t retired_nodes, uint32_t num_nodes) +{ + uint32_t old_nodes = num_nodes - added_nodes; + double old_redundancy = std::min(redundancy, old_nodes); + double old_per_node_doc_ratio = old_redundancy / old_nodes; + uint32_t new_nodes = num_nodes - retired_nodes; + double new_redundancy = std::min(redundancy, new_nodes); + double new_per_node_doc_ratio = new_redundancy / new_nodes; + double moved_ratio = new_per_node_doc_ratio * added_nodes; + uint32_t stable_nodes = num_nodes - added_nodes - retired_nodes; + // Account for extra documents moved from retired nodes to stable nodes + // TODO: Fix calculation + double extra_moved_ratio = (new_per_node_doc_ratio - old_per_node_doc_ratio) * stable_nodes; + extra_moved_ratio += new_per_node_doc_ratio * stable_nodes * retired_nodes / new_nodes; + moved_ratio += extra_moved_ratio; + LOG(info, "estimated_moved_docs_ratio_replace(%u,%u,%u,%u)=%4.2f, (of which %4.2f extra)", redundancy, added_nodes, retired_nodes, num_nodes, moved_ratio, extra_moved_ratio); + return moved_ratio; +} + +class BMParams : public BmClusterParams, + public BmFeedParams +{ + uint32_t _flip_nodes; + Mode _mode; + bool _use_feed_settle; +public: + BMParams(); + uint32_t get_flip_nodes() const noexcept { return _flip_nodes; } + Mode get_mode() const noexcept { return _mode; } + bool get_use_feed_settle() const noexcept { return _use_feed_settle; } + void set_flip_nodes(uint32_t value) { _flip_nodes = value; } + void set_mode(Mode value) { _mode = value; } + void set_use_feed_settle(bool value) { _use_feed_settle = value; } + bool check() const; +}; + +BMParams::BMParams() + : BmClusterParams(), + BmFeedParams(), + _flip_nodes(1u), + _mode(Mode::GROW), + _use_feed_settle(false) +{ + set_enable_service_layer(true); + set_enable_distributor(true); + set_use_document_api(true); + set_num_nodes(4); +} + + +bool +BMParams::check() const +{ + if (!BmClusterParams::check()) { + return false; + } + if (!BmFeedParams::check()) { + return false; + } + if (get_num_nodes() < 2u) { + std::cerr << "Too few nodes: " << get_num_nodes() << std::endl; + return false; + } + if (_mode == Mode::REPLACE) { + if (_flip_nodes * 2 > get_num_nodes()) { + std::cerr << "Too many flip nodes (" << _flip_nodes << ") with " << get_num_nodes() << " nodes (replace mode)" << std::endl; + return false; + } + } else { + if (_flip_nodes >= get_num_nodes()) { + std::cerr << "Too many flip nodes (" << _flip_nodes << ") with " << get_num_nodes() << " nodes (" << get_mode_name(_mode) << " mode)" << std::endl; + return false; + } + } + if (_mode == Mode::BAD) { + std::cerr << "Bad mode" << std::endl; + return false; + } + return true; +} + +} + +class Benchmark { + BMParams _params; + std::shared_ptr<const DocumenttypesConfig> _document_types; + std::shared_ptr<const DocumentTypeRepo> _repo; + std::unique_ptr<BmCluster> _cluster; + BmFeed _feed; + + void adjust_cluster_state_before_feed(); + void adjust_cluster_state_after_feed(); + void adjust_cluster_state_after_first_redistribution(); + double estimate_lost_docs(); + double estimate_moved_docs(); + void feed(); + std::chrono::duration<double> redistribute(); + +public: + explicit Benchmark(const BMParams& params); + ~Benchmark(); + void run(); +}; + +Benchmark::Benchmark(const BMParams& params) + : _params(params), + _document_types(make_document_types()), + _repo(document::DocumentTypeRepoFactory::make(*_document_types)), + _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)), + _feed(_repo) +{ + _cluster->make_nodes(); +} + +Benchmark::~Benchmark() = default; + +void +Benchmark::adjust_cluster_state_before_feed() +{ + auto& dist = _cluster->get_real_distribution(); + auto& mode_name = get_mode_name(_params.get_mode()); + switch (_params.get_mode()) { + case Mode::GROW: + case Mode::REPLACE: + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i, State::DOWN); + } + LOG(info, "Mode %s: Taking down %u node(s) initially", mode_name.c_str(), _params.get_flip_nodes()); + break; + default: + LOG(info, "Mode %s: No cluster state adjust before feed", mode_name.c_str()); + } + dist.commit_cluster_state_change(); +} + +void +Benchmark::adjust_cluster_state_after_feed() +{ + auto& dist = _cluster->get_real_distribution(); + auto& mode_name = get_mode_name(_params.get_mode()); + switch (_params.get_mode()) { + case Mode::GROW: + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i, State::UP); + } + LOG(info, "Mode %s: taking up %u node(s)", mode_name.c_str(), _params.get_flip_nodes()); + break; + case Mode::SHRINK: + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i, State::RETIRED); + } + LOG(info, "Mode %s: Retiring %u node(s)", mode_name.c_str(), _params.get_flip_nodes()); + break; + case Mode::PERM_CRASH: + case Mode::TEMP_CRASH: + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i, State::DOWN); + } + LOG(info, "Mode %s: taking down %u node(s)", mode_name.c_str(), _params.get_flip_nodes()); + break; + case Mode::REPLACE: + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i, State::UP); + } + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i + _params.get_flip_nodes(), State::RETIRED); + } + LOG(info, "Mode %s: Taking up %u node(s) and retiring %u node(s)", mode_name.c_str(), _params.get_flip_nodes(), _params.get_flip_nodes()); + break; + default: + LOG(info, "Mode %s: No cluster state adjust after feed", mode_name.c_str()); + } + dist.commit_cluster_state_change(); +} + +void +Benchmark::adjust_cluster_state_after_first_redistribution() +{ + auto& dist = _cluster->get_real_distribution(); + auto& mode_name = get_mode_name(_params.get_mode()); + switch (_params.get_mode()) { + case Mode::TEMP_CRASH: + for (uint32_t i = 0; i < _params.get_flip_nodes(); ++i) { + dist.set_node_state(i, State::UP); + } + LOG(info, "Mode %s: taking up %u node(s)", mode_name.c_str(), _params.get_flip_nodes()); + break; + default: + LOG(info, "Mode %s: No cluster state adjust after first redistribution", mode_name.c_str()); + } + dist.commit_cluster_state_change(); +} + +void +Benchmark::feed() +{ + vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki); + BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor); + auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put"); + BmNodeStatsReporter reporter(*_cluster); + reporter.start(500ms); + int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count(); + LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str()); + AvgSampler sampler; + feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put"); + reporter.report_now(); + if (_params.get_use_feed_settle()) { + LOG(info, "Settling feed"); + std::this_thread::sleep_for(2s); + reporter.report_now(); + } +} + + +std::chrono::duration<double> +Benchmark::redistribute() +{ + BmNodeStatsReporter reporter(*_cluster); + auto before = std::chrono::steady_clock::now(); + reporter.start(500ms); + _cluster->propagate_cluster_state(); + reporter.report_now(); + for (;;) { + auto duration = std::chrono::steady_clock::now() - reporter.get_change_time(); + if (duration >= 6s) { + break; + } + std::this_thread::sleep_for(100ms); + } + return reporter.get_change_time() - before; +} + +double +Benchmark::estimate_lost_docs() +{ + switch (_params.get_mode()) { + case Mode::PERM_CRASH: + case Mode::TEMP_CRASH: + { + double new_redundancy = std::min(_params.get_redundancy(), _params.get_num_nodes() - _params.get_flip_nodes()); + auto lost_docs_ratio = estimate_lost_docs_base_ratio(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes()) * new_redundancy; + return _params.get_documents() * lost_docs_ratio; + } + default: + return 0.0; + } +} + +double +Benchmark::estimate_moved_docs() +{ + switch(_params.get_mode()) { + case Mode::GROW: + return _params.get_documents() * estimate_moved_docs_ratio_grow(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes()); + case Mode::SHRINK: + return _params.get_documents() * estimate_moved_docs_ratio_shrink(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes()); + case Mode::PERM_CRASH: + case Mode::TEMP_CRASH: + return _params.get_documents() * estimate_moved_nodes_ratio_crash(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_num_nodes()); + case Mode::REPLACE: + return _params.get_documents() * estimate_moved_nodes_ratio_replace(_params.get_redundancy(), _params.get_flip_nodes(), _params.get_flip_nodes(), _params.get_num_nodes()); + default: + return 0.0; + } +} + +void +Benchmark::run() +{ + adjust_cluster_state_before_feed(); + _cluster->start(_feed); + feed(); + LOG(info, "--------------------------------"); + adjust_cluster_state_after_feed(); + auto elapsed = redistribute(); + double moved_docs = estimate_moved_docs(); + double lost_docs = estimate_lost_docs(); + LOG(info, "Redistributed estimated %4.2f docs in %5.3f seconds, %4.2f docs/s, estimated %4.2f lost docs", moved_docs, elapsed.count(), moved_docs / elapsed.count(), lost_docs); + if (_params.get_mode() == Mode::TEMP_CRASH) { + if (_params.get_use_feed_settle()) { + LOG(info, "Settling redistribution"); + std::this_thread::sleep_for(2s); + } + adjust_cluster_state_after_first_redistribution(); + elapsed = redistribute(); + LOG(info, "Cleanup of %4.2f docs in %5.3f seconds, %4.2f docs/s, estimated %4.2f refound docs", moved_docs, elapsed.count(), moved_docs / elapsed.count(), lost_docs); + } + _cluster->stop(); +} + +class App : public FastOS_Application +{ + BMParams _bm_params; +public: + App(); + ~App() override; + void usage(); + bool get_options(); + int Main() override; +}; + +App::App() + : _bm_params() +{ +} + +App::~App() = default; + +void +App::usage() +{ + std::cerr << + "vespa-redistribute-bm version 0.0\n" + "\n" + "USAGE:\n"; + std::cerr << + "vespa-redistribute-bm\n" + "[--bucket-db-stripe-bits bits]\n" + "[--client-threads threads]\n" + "[--distributor-stripes stripes]\n" + "[--documents documents]\n" + "[--flip-nodes flip-nodes]\n" + "[--indexing-sequencer [latency,throughput,adaptive]]\n" + "[--max-pending max-pending]\n" + "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n" + "[--nodes nodes]\n" + "[--redundancy redundancy]\n" + "[--rpc-events-before-wakeup events]\n" + "[--rpc-network-threads threads]\n" + "[--rpc-targets-per-node targets]\n" + "[--response-threads threads]\n" + "[--skip-communicationmanager-thread]\n" + "[--use-async-message-handling]\n" + "[--use-feed-settle]" << std::endl; +} + +bool +App::get_options() +{ + int c; + const char *opt_argument = nullptr; + int long_opt_index = 0; + static struct option long_opts[] = { + { "bucket-db-stripe-bits", 1, nullptr, 0 }, + { "client-threads", 1, nullptr, 0 }, + { "distributor-stripes", 1, nullptr, 0 }, + { "documents", 1, nullptr, 0 }, + { "flip-nodes", 1, nullptr, 0 }, + { "indexing-sequencer", 1, nullptr, 0 }, + { "max-pending", 1, nullptr, 0 }, + { "mode", 1, nullptr, 0 }, + { "nodes", 1, nullptr, 0 }, + { "redundancy", 1, nullptr, 0 }, + { "response-threads", 1, nullptr, 0 }, + { "rpc-events-before-wakeup", 1, nullptr, 0 }, + { "rpc-network-threads", 1, nullptr, 0 }, + { "rpc-targets-per-node", 1, nullptr, 0 }, + { "skip-communicationmanager-thread", 0, nullptr, 0 }, + { "use-async-message-handling", 0, nullptr, 0 }, + { "use-feed-settle", 0, nullptr, 0 } + }; + enum longopts_enum { + LONGOPT_BUCKET_DB_STRIPE_BITS, + LONGOPT_CLIENT_THREADS, + LONGOPT_DISTRIBUTOR_STRIPES, + LONGOPT_DOCUMENTS, + LONGOPT_FLIP_NODES, + LONGOPT_INDEXING_SEQUENCER, + LONGOPT_MAX_PENDING, + LONGOPT_MODE, + LONGOPT_NODES, + LONGOPT_REDUNDANCY, + LONGOPT_RESPONSE_THREADS, + LONGOPT_RPC_EVENTS_BEFORE_WAKEUP, + LONGOPT_RPC_NETWORK_THREADS, + LONGOPT_RPC_TARGETS_PER_NODE, + LONGOPT_SKIP_COMMUNICATIONMANAGER_THREAD, + LONGOPT_USE_ASYNC_MESSAGE_HANDLING, + LONGOPT_USE_FEED_SETTLE + }; + int opt_index = 1; + resetOptIndex(opt_index); + while ((c = GetOptLong("", opt_argument, opt_index, long_opts, &long_opt_index)) != -1) { + switch (c) { + case 0: + switch(long_opt_index) { + case LONGOPT_BUCKET_DB_STRIPE_BITS: + _bm_params.set_bucket_db_stripe_bits(atoi(opt_argument)); + break; + case LONGOPT_CLIENT_THREADS: + _bm_params.set_client_threads(atoi(opt_argument)); + break; + case LONGOPT_DISTRIBUTOR_STRIPES: + _bm_params.set_distributor_stripes(atoi(opt_argument)); + break; + case LONGOPT_DOCUMENTS: + _bm_params.set_documents(atoi(opt_argument)); + break; + case LONGOPT_FLIP_NODES: + _bm_params.set_flip_nodes(atoi(opt_argument)); + break; + case LONGOPT_INDEXING_SEQUENCER: + _bm_params.set_indexing_sequencer(opt_argument); + break; + case LONGOPT_MAX_PENDING: + _bm_params.set_max_pending(atoi(opt_argument)); + break; + case LONGOPT_MODE: + _bm_params.set_mode(get_mode(opt_argument)); + if (_bm_params.get_mode() == Mode::BAD) { + std::cerr << "Unknown mode name " << opt_argument << std::endl; + } + break; + case LONGOPT_NODES: + _bm_params.set_num_nodes(atoi(opt_argument)); + break; + case LONGOPT_REDUNDANCY: + _bm_params.set_redundancy(atoi(opt_argument)); + break; + case LONGOPT_RESPONSE_THREADS: + _bm_params.set_response_threads(atoi(opt_argument)); + break; + case LONGOPT_RPC_EVENTS_BEFORE_WAKEUP: + _bm_params.set_rpc_events_before_wakeup(atoi(opt_argument)); + break; + case LONGOPT_RPC_NETWORK_THREADS: + _bm_params.set_rpc_network_threads(atoi(opt_argument)); + break; + case LONGOPT_RPC_TARGETS_PER_NODE: + _bm_params.set_rpc_targets_per_node(atoi(opt_argument)); + break; + case LONGOPT_SKIP_COMMUNICATIONMANAGER_THREAD: + _bm_params.set_skip_communicationmanager_thread(true); + break; + case LONGOPT_USE_ASYNC_MESSAGE_HANDLING: + _bm_params.set_use_async_message_handling_on_schedule(true); + break; + case LONGOPT_USE_FEED_SETTLE: + _bm_params.set_use_feed_settle(true); + break; + default: + return false; + } + break; + default: + return false; + } + } + return _bm_params.check(); +} + +int +App::Main() +{ + if (!get_options()) { + usage(); + return 1; + } + vespalib::rmdir(base_dir, true); + Benchmark bm(_bm_params); + bm.run(); + return 0; +} + +int +main(int argc, char* argv[]) +{ + DummyFileHeaderContext::setCreator("vespa-redistribute-bm"); + App app; + auto exit_value = app.Entry(argc, argv); + vespalib::rmdir(base_dir, true); + return exit_value; +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp index 36e1e394f40..9fc0b26fff5 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp @@ -148,7 +148,8 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl _document_types(std::move(document_types)), _repo(std::move(repo)), _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)), - _distribution(std::make_shared<const BmDistribution>(params.get_num_nodes())), + _real_distribution(std::make_shared<BmDistribution>(params.get_num_nodes(), params.get_redundancy())), + _distribution(_real_distribution), _nodes(params.get_num_nodes()), _cluster_controller(std::make_shared<BmClusterController>(*this, *_distribution)), _feed_handler() @@ -438,4 +439,10 @@ BmCluster::get_node_stats() return node_stats; } +void +BmCluster::propagate_cluster_state() +{ + _cluster_controller->propagate_cluster_state(); +} + } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h index 8615b44bd7e..62ec710c296 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h @@ -26,6 +26,7 @@ namespace storage::rpc { class SharedRpcResources; } namespace search::bmcluster { class BmClusterController; +class BmDistribution; class BmFeed; class BmMessageBus; class BmNode; @@ -55,6 +56,7 @@ class BmCluster { std::shared_ptr<DocumenttypesConfig> _document_types; std::shared_ptr<const document::DocumentTypeRepo> _repo; std::unique_ptr<const document::FieldSetRepo> _field_set_repo; + std::shared_ptr<BmDistribution> _real_distribution; std::shared_ptr<const IBmDistribution> _distribution; std::vector<std::unique_ptr<BmNode>> _nodes; std::shared_ptr<BmClusterController> _cluster_controller; @@ -90,6 +92,8 @@ public: uint32_t get_num_nodes() const { return _nodes.size(); } BmNode *get_node(uint32_t node_idx) const { return node_idx < _nodes.size() ? _nodes[node_idx].get() : nullptr; } std::vector<BmNodeStats> get_node_stats(); + BmDistribution& get_real_distribution() { return *_real_distribution; } + void propagate_cluster_state(); }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp index 7766ab6c5b3..2a214130392 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -12,6 +12,7 @@ BmClusterParams::BmClusterParams() _enable_service_layer(false), _indexing_sequencer(), _num_nodes(1), + _redundancy(1), _response_threads(2), // Same default as in stor-filestor.def _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def @@ -42,6 +43,10 @@ BmClusterParams::check() const std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl; return false; } + if (_num_nodes < _redundancy) { + std::cerr << "Too high redundancy " << _redundancy << " with " << _num_nodes << " nodes" << std::endl; + return false; + } return true; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h index 5bc6b97487c..68dff7f52ed 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -18,6 +18,7 @@ class BmClusterParams bool _enable_service_layer; vespalib::string _indexing_sequencer; uint32_t _num_nodes; + uint32_t _redundancy; uint32_t _response_threads; uint32_t _rpc_events_before_wakeup; uint32_t _rpc_network_threads; @@ -36,6 +37,7 @@ public: bool get_enable_distributor() const { return _enable_distributor; } const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } uint32_t get_num_nodes() const { return _num_nodes; } + uint32_t get_redundancy() const { return _redundancy; } uint32_t get_response_threads() const { return _response_threads; } uint32_t get_rpc_events_before_wakeup() const { return _rpc_events_before_wakeup; } uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } @@ -55,6 +57,7 @@ public: void set_enable_service_layer(bool value) { _enable_service_layer = value; } void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } void set_num_nodes(uint32_t value) { _num_nodes = value; } + void set_redundancy(uint32_t value) { _redundancy = value; } void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; } void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp index 44b253b3d35..2ab1d4fd7f7 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp @@ -5,7 +5,12 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <cassert> +using storage::lib::ClusterState; using storage::lib::ClusterStateBundle; +using storage::lib::Node; +using storage::lib::NodeState; +using storage::lib::NodeType; +using storage::lib::State; namespace search::bmcluster { @@ -14,7 +19,7 @@ using DistributionConfigBuilder = BmDistribution::DistributionConfigBuilder; namespace { BmDistribution::DistributionConfig -make_distribution_config(uint32_t num_nodes) +make_distribution_config(uint32_t num_nodes, uint32_t redundancy) { DistributionConfigBuilder dc; { @@ -32,27 +37,28 @@ make_distribution_config(uint32_t num_nodes) group.partitions = ""; dc.group.push_back(std::move(group)); } - dc.redundancy = 1; - dc.readyCopies = 1; + dc.redundancy = redundancy; + dc.readyCopies = redundancy; return dc; } -ClusterStateBundle -make_cluster_state_bundle(uint32_t num_nodes) +ClusterState +make_cluster_state(uint32_t num_nodes) { vespalib::asciistream s; s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes; - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str())); - return bundle; + return storage::lib::ClusterState(s.str()); } } -BmDistribution::BmDistribution(uint32_t num_nodes) +BmDistribution::BmDistribution(uint32_t num_nodes, uint32_t redundancy) : _num_nodes(num_nodes), - _distribution_config(make_distribution_config(num_nodes)), + _distribution_config(make_distribution_config(num_nodes, redundancy)), _distribution(_distribution_config), - _cluster_state_bundle(make_cluster_state_bundle(num_nodes)) + _pending_cluster_state(make_cluster_state(num_nodes)), + _cluster_state_bundle(_pending_cluster_state), + _has_pending_cluster_state(false) { } @@ -95,4 +101,33 @@ BmDistribution::get_cluster_state_bundle() const return _cluster_state_bundle; } +void +BmDistribution::set_node_state(uint32_t node_idx, bool distributor, const State& state) +{ + const NodeType& node_type = distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE; + Node node(node_type, node_idx); + NodeState node_state(node_type, state); + _pending_cluster_state.setNodeState(node, node_state); + if (!_has_pending_cluster_state) { + _pending_cluster_state.setVersion(_pending_cluster_state.getVersion() + 1); + _has_pending_cluster_state = true; + } +} + +void +BmDistribution::set_node_state(uint32_t node_idx, const State& state) +{ + set_node_state(node_idx, false, state); + set_node_state(node_idx, true, state); +} + +void +BmDistribution::commit_cluster_state_change() +{ + if (_has_pending_cluster_state) { + _cluster_state_bundle = ClusterStateBundle(_pending_cluster_state); + _has_pending_cluster_state = false; + } +} + }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h index fde89a0d766..6be7592f561 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h @@ -19,16 +19,20 @@ class BmDistribution : public IBmDistribution uint32_t _num_nodes; DistributionConfigBuilder _distribution_config; storage::lib::Distribution _distribution; + storage::lib::ClusterState _pending_cluster_state; storage::lib::ClusterStateBundle _cluster_state_bundle; - + bool _has_pending_cluster_state; public: - BmDistribution(uint32_t num_nodes); + BmDistribution(uint32_t num_nodes, uint32_t redundancy); ~BmDistribution() override; uint32_t get_num_nodes() const override; uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const override; uint32_t get_distributor_node_idx(const document::Bucket & bucket) const override; DistributionConfig get_distribution_config() const override; storage::lib::ClusterStateBundle get_cluster_state_bundle() const override; + void set_node_state(uint32_t node_idx, bool distributor, const storage::lib::State& state); + void set_node_state(uint32_t node_idx, const storage::lib::State& state); + void commit_cluster_state_change(); }; }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp index 4d464923efa..83956dfc274 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.cpp @@ -16,11 +16,29 @@ using vespalib::makeLambdaTask; namespace search::bmcluster { +namespace { + +bool steady_buckets_stats(const std::optional<BmBucketsStats> buckets) +{ + if (!buckets.has_value()) { + return false; // No info available + } + auto& value = buckets.value(); + if (!value.get_valid()) { + return false; // Some information still missing + } + return value.get_buckets_pending() == 0u; +} + +} + BmNodeStatsReporter::BmNodeStatsReporter(BmCluster &cluster) : _cluster(cluster), _executor(1, 128_Ki), _mutex(), _cond(), + _change_time(), + _prev_node_stats(), _pending_report(1u), _started(false), _stop(false) @@ -99,6 +117,10 @@ BmNodeStatsReporter::report() } vespalib::string ss(s.str()); LOG(info, "%s", ss.c_str()); + if (!(node_stats == _prev_node_stats) || !steady_buckets_stats(total_buckets)) { + _change_time = std::chrono::steady_clock::now(); + _prev_node_stats = node_stats; + } } void diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h index 356d35474d1..1c8f013f5e6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node_stats_reporter.h @@ -10,6 +10,7 @@ namespace search::bmcluster { class BmCluster; +class BmNodeStats; /* * Class handling background reporting of node stats during feed or @@ -20,6 +21,8 @@ class BmNodeStatsReporter { vespalib::ThreadStackExecutor _executor; std::mutex _mutex; std::condition_variable _cond; + std::chrono::time_point<std::chrono::steady_clock> _change_time; + std::vector<BmNodeStats> _prev_node_stats; uint32_t _pending_report; bool _started; bool _stop; @@ -32,6 +35,7 @@ public: void start(std::chrono::milliseconds interval); void stop(); void report_now(); + std::chrono::time_point<std::chrono::steady_clock> get_change_time() const noexcept { return _change_time; } }; } |