From 780f8c25677588810fa55b82692c468603162d66 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 6 Oct 2021 12:45:35 +0200 Subject: Enable refeed during document redistribution. --- .../vespa_redistribute_bm.cpp | 120 ++++++++++++++++++++- 1 file changed, 115 insertions(+), 5 deletions(-) (limited to 'searchcore/src/apps/vespa-redistribute-bm') diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index 3fc42e50cdb..1272521ff54 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -54,6 +54,7 @@ using search::bmcluster::BmRange; using search::bmcluster::BucketSelector; using search::index::DummyFileHeaderContext; using storage::lib::State; +using vespalib::makeLambdaTask; namespace { @@ -74,7 +75,7 @@ enum class Mode { PERM_CRASH, TEMP_CRASH, REPLACE, - BAD, + BAD }; std::vector mode_names = { @@ -101,19 +102,44 @@ vespalib::string& get_mode_name(Mode mode) { return (i < mode_names.size()) ? mode_names[i] : bad_mode_name; } +enum ReFeedMode { + NONE, + PUT, + UPDATE, + BAD +}; + +std::vector refeed_mode_names = { + "none", + "put", + "update" +}; + +ReFeedMode get_refeed_mode(const vespalib::string& refeed_mode_name) { + for (uint32_t i = 0; i < refeed_mode_names.size(); ++i) { + if (refeed_mode_name == refeed_mode_names[i]) { + return static_cast(i); + } + } + return ReFeedMode::BAD; +} + class BMParams : public BmClusterParams, public BmFeedParams { uint32_t _flip_nodes; Mode _mode; + ReFeedMode _refeed_mode; bool _use_feed_settle; public: BMParams(); uint32_t get_flip_nodes() const noexcept { return _flip_nodes; } Mode get_mode() const noexcept { return _mode; } + ReFeedMode get_refeed_mode() const noexcept { return _refeed_mode; } bool get_use_feed_settle() const noexcept { return _use_feed_settle; } void set_flip_nodes(uint32_t value) { _flip_nodes = value; } void set_mode(Mode value) { _mode = value; } + void set_refeed_mode(ReFeedMode value) { _refeed_mode = value; } void set_use_feed_settle(bool value) { _use_feed_settle = value; } bool check() const; }; @@ -123,6 +149,7 @@ BMParams::BMParams() BmFeedParams(), _flip_nodes(1u), _mode(Mode::GROW), + _refeed_mode(ReFeedMode::NONE), _use_feed_settle(false) { set_enable_service_layer(true); @@ -160,9 +187,55 @@ BMParams::check() const std::cerr << "Bad mode" << std::endl; return false; } + if (_refeed_mode == ReFeedMode::BAD) { + std::cerr << "Bad refeed-mode" << std::endl; + return false; + } return true; } +class ReFeed { + vespalib::ThreadStackExecutor _top_executor; + vespalib::ThreadStackExecutor _executor; + BmFeeder _feeder; + const vespalib::string _op_name; + const BMParams& _params; + int64_t& _time_bias; + const std::vector& _feed; + + void run(); +public: + ReFeed(const BMParams& params, std::shared_ptr repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector& feed, const vespalib::string& op_name); + ~ReFeed(); +}; + +ReFeed::ReFeed(const BMParams& params, std::shared_ptr repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector& feed, const vespalib::string& op_name) + : _top_executor(1, 128_Ki), + _executor(params.get_client_threads(), 128_Ki), + _feeder(repo, feed_handler, _executor), + _op_name(op_name), + _params(params), + _time_bias(time_bias), + _feed(feed) +{ + _top_executor.execute(makeLambdaTask([this]() + { run(); })); +} + +ReFeed::~ReFeed() +{ + _feeder.stop(); + _top_executor.sync(); + _top_executor.shutdown(); +} + +void +ReFeed::run() +{ + std::this_thread::sleep_for(2s); + _feeder.run_feed_tasks_loop(_time_bias, _feed, _params, _op_name); +} + } class Benchmark { @@ -171,10 +244,14 @@ class Benchmark { std::shared_ptr _repo; std::unique_ptr _cluster; BmFeed _feed; + std::vector _put_feed; + std::vector _update_feed; + int64_t _time_bias; void adjust_cluster_state_before_feed(); void adjust_cluster_state_after_feed(); void adjust_cluster_state_after_first_redistribution(); + void make_feed(); void feed(); std::chrono::duration redistribute(); @@ -189,7 +266,10 @@ Benchmark::Benchmark(const BMParams& params) _document_types(make_document_types()), _repo(document::DocumentTypeRepoFactory::make(*_document_types)), _cluster(std::make_unique(base_dir, base_port, _params, _document_types, _repo)), - _feed(_repo) + _feed(_repo), + _put_feed(), + _update_feed(), + _time_bias(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch() - 24h).count()) { _cluster->make_nodes(); } @@ -273,18 +353,26 @@ Benchmark::adjust_cluster_state_after_first_redistribution() dist.commit_cluster_state_change(); } +void +Benchmark::make_feed() +{ + vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki); + _put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put"); + if (_params.get_refeed_mode() == ReFeedMode::UPDATE) { + _update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update"); + } +} + void Benchmark::feed() { vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki); BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor); - auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put"); BmNodeStatsReporter reporter(*_cluster, false); reporter.start(500ms); - int64_t time_bias = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch() - 24h).count(); LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str()); AvgSampler sampler; - feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put"); + feeder.run_feed_tasks(0, _time_bias, _put_feed, _params, sampler, "put"); reporter.report_now(); if (_params.get_use_feed_settle()) { LOG(info, "Settling feed"); @@ -302,6 +390,17 @@ Benchmark::redistribute() reporter.start(500ms); _cluster->propagate_cluster_state(); reporter.report_now(); + std::unique_ptr refeed; + switch (_params.get_refeed_mode()) { + case ReFeedMode::PUT: + refeed = std::make_unique(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _put_feed, "put"); + break; + case ReFeedMode::UPDATE: + refeed = std::make_unique(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _update_feed, "update"); + break; + default: + ; + } for (;;) { auto duration = std::chrono::steady_clock::now() - reporter.get_change_time(); if (duration >= 6s) { @@ -309,6 +408,7 @@ Benchmark::redistribute() } std::this_thread::sleep_for(100ms); } + refeed.reset(); return reporter.get_change_time() - before; } @@ -317,6 +417,7 @@ Benchmark::run() { adjust_cluster_state_before_feed(); _cluster->start(_feed); + make_feed(); feed(); LOG(info, "--------------------------------"); auto old_snapshot = _cluster->get_bucket_db_snapshots(); @@ -382,6 +483,7 @@ App::usage() "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n" "[--nodes-per-group nodes-per-group]\n" "[--redundancy redundancy]\n" + "[--refeed-mode [none, put, update]\n" "[--rpc-events-before-wakeup events]\n" "[--rpc-network-threads threads]\n" "[--rpc-targets-per-node targets]\n" @@ -415,6 +517,7 @@ App::get_options() { "mode", 1, nullptr, 0 }, { "nodes-per-group", 1, nullptr, 0 }, { "redundancy", 1, nullptr, 0 }, + { "refeed-mode", 1, nullptr, 0 }, { "response-threads", 1, nullptr, 0 }, { "rpc-events-before-wakeup", 1, nullptr, 0 }, { "rpc-network-threads", 1, nullptr, 0 }, @@ -442,6 +545,7 @@ App::get_options() LONGOPT_MODE, LONGOPT_NODES_PER_GROUP, LONGOPT_REDUNDANCY, + LONGOPT_REFEED, LONGOPT_RESPONSE_THREADS, LONGOPT_RPC_EVENTS_BEFORE_WAKEUP, LONGOPT_RPC_NETWORK_THREADS, @@ -510,6 +614,12 @@ App::get_options() case LONGOPT_REDUNDANCY: _bm_params.set_redundancy(atoi(opt_argument)); break; + case LONGOPT_REFEED: + _bm_params.set_refeed_mode(get_refeed_mode(opt_argument)); + if (_bm_params.get_refeed_mode() == ReFeedMode::BAD) { + std::cerr << "Unknown refeed-mode name " << opt_argument << std::endl; + } + break; case LONGOPT_RESPONSE_THREADS: _bm_params.set_response_threads(atoi(opt_argument)); break; -- cgit v1.2.3