diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-10-06 12:45:35 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-10-06 12:49:23 +0200 |
commit | 780f8c25677588810fa55b82692c468603162d66 (patch) | |
tree | bdefe091f0de4f7506f0437b2c8d74310f931608 /searchcore/src | |
parent | f63d33eb7a47c3bfd80229098a71eae1b3b48300 (diff) |
Enable refeed during document redistribution.
Diffstat (limited to 'searchcore/src')
4 files changed, 160 insertions, 23 deletions
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index 3fc42e50cdb..1272521ff54 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -54,6 +54,7 @@ using search::bmcluster::BmRange; using search::bmcluster::BucketSelector; using search::index::DummyFileHeaderContext; using storage::lib::State; +using vespalib::makeLambdaTask; namespace { @@ -74,7 +75,7 @@ enum class Mode { PERM_CRASH, TEMP_CRASH, REPLACE, - BAD, + BAD }; std::vector<vespalib::string> mode_names = { @@ -101,19 +102,44 @@ vespalib::string& get_mode_name(Mode mode) { return (i < mode_names.size()) ? mode_names[i] : bad_mode_name; } +enum ReFeedMode { + NONE, + PUT, + UPDATE, + BAD +}; + +std::vector<vespalib::string> refeed_mode_names = { + "none", + "put", + "update" +}; + +ReFeedMode get_refeed_mode(const vespalib::string& refeed_mode_name) { + for (uint32_t i = 0; i < refeed_mode_names.size(); ++i) { + if (refeed_mode_name == refeed_mode_names[i]) { + return static_cast<ReFeedMode>(i); + } + } + return ReFeedMode::BAD; +} + class BMParams : public BmClusterParams, public BmFeedParams { uint32_t _flip_nodes; Mode _mode; + ReFeedMode _refeed_mode; bool _use_feed_settle; public: BMParams(); uint32_t get_flip_nodes() const noexcept { return _flip_nodes; } Mode get_mode() const noexcept { return _mode; } + ReFeedMode get_refeed_mode() const noexcept { return _refeed_mode; } bool get_use_feed_settle() const noexcept { return _use_feed_settle; } void set_flip_nodes(uint32_t value) { _flip_nodes = value; } void set_mode(Mode value) { _mode = value; } + void set_refeed_mode(ReFeedMode value) { _refeed_mode = value; } void set_use_feed_settle(bool value) { _use_feed_settle = value; } bool check() const; }; @@ -123,6 +149,7 @@ BMParams::BMParams() BmFeedParams(), _flip_nodes(1u), _mode(Mode::GROW), + _refeed_mode(ReFeedMode::NONE), _use_feed_settle(false) { set_enable_service_layer(true); @@ -160,9 +187,55 @@ BMParams::check() const std::cerr << "Bad mode" << std::endl; return false; } + if (_refeed_mode == ReFeedMode::BAD) { + std::cerr << "Bad refeed-mode" << std::endl; + return false; + } return true; } +class ReFeed { + vespalib::ThreadStackExecutor _top_executor; + vespalib::ThreadStackExecutor _executor; + BmFeeder _feeder; + const vespalib::string _op_name; + const BMParams& _params; + int64_t& _time_bias; + const std::vector<vespalib::nbostream>& _feed; + + void run(); +public: + ReFeed(const BMParams& params, std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const vespalib::string& op_name); + ~ReFeed(); +}; + +ReFeed::ReFeed(const BMParams& params, std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const vespalib::string& op_name) + : _top_executor(1, 128_Ki), + _executor(params.get_client_threads(), 128_Ki), + _feeder(repo, feed_handler, _executor), + _op_name(op_name), + _params(params), + _time_bias(time_bias), + _feed(feed) +{ + _top_executor.execute(makeLambdaTask([this]() + { run(); })); +} + +ReFeed::~ReFeed() +{ + _feeder.stop(); + _top_executor.sync(); + _top_executor.shutdown(); +} + +void +ReFeed::run() +{ + std::this_thread::sleep_for(2s); + _feeder.run_feed_tasks_loop(_time_bias, _feed, _params, _op_name); +} + } class Benchmark { @@ -171,10 +244,14 @@ class Benchmark { std::shared_ptr<const DocumentTypeRepo> _repo; std::unique_ptr<BmCluster> _cluster; BmFeed _feed; + std::vector<vespalib::nbostream> _put_feed; + std::vector<vespalib::nbostream> _update_feed; + int64_t _time_bias; void adjust_cluster_state_before_feed(); void adjust_cluster_state_after_feed(); void adjust_cluster_state_after_first_redistribution(); + void make_feed(); void feed(); std::chrono::duration<double> redistribute(); @@ -189,7 +266,10 @@ Benchmark::Benchmark(const BMParams& params) _document_types(make_document_types()), _repo(document::DocumentTypeRepoFactory::make(*_document_types)), _cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)), - _feed(_repo) + _feed(_repo), + _put_feed(), + _update_feed(), + _time_bias(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count()) { _cluster->make_nodes(); } @@ -274,17 +354,25 @@ Benchmark::adjust_cluster_state_after_first_redistribution() } void +Benchmark::make_feed() +{ + vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki); + _put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put"); + if (_params.get_refeed_mode() == ReFeedMode::UPDATE) { + _update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update"); + } +} + +void Benchmark::feed() { vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki); BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor); - auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put"); BmNodeStatsReporter reporter(*_cluster, false); reporter.start(500ms); - int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count(); LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str()); AvgSampler sampler; - feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put"); + feeder.run_feed_tasks(0, _time_bias, _put_feed, _params, sampler, "put"); reporter.report_now(); if (_params.get_use_feed_settle()) { LOG(info, "Settling feed"); @@ -302,6 +390,17 @@ Benchmark::redistribute() reporter.start(500ms); _cluster->propagate_cluster_state(); reporter.report_now(); + std::unique_ptr<ReFeed> refeed; + switch (_params.get_refeed_mode()) { + case ReFeedMode::PUT: + refeed = std::make_unique<ReFeed>(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _put_feed, "put"); + break; + case ReFeedMode::UPDATE: + refeed = std::make_unique<ReFeed>(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _update_feed, "update"); + break; + default: + ; + } for (;;) { auto duration = std::chrono::steady_clock::now() - reporter.get_change_time(); if (duration >= 6s) { @@ -309,6 +408,7 @@ Benchmark::redistribute() } std::this_thread::sleep_for(100ms); } + refeed.reset(); return reporter.get_change_time() - before; } @@ -317,6 +417,7 @@ Benchmark::run() { adjust_cluster_state_before_feed(); _cluster->start(_feed); + make_feed(); feed(); LOG(info, "--------------------------------"); auto old_snapshot = _cluster->get_bucket_db_snapshots(); @@ -382,6 +483,7 @@ App::usage() "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n" "[--nodes-per-group nodes-per-group]\n" "[--redundancy redundancy]\n" + "[--refeed-mode [none, put, update]\n" "[--rpc-events-before-wakeup events]\n" "[--rpc-network-threads threads]\n" "[--rpc-targets-per-node targets]\n" @@ -415,6 +517,7 @@ App::get_options() { "mode", 1, nullptr, 0 }, { "nodes-per-group", 1, nullptr, 0 }, { "redundancy", 1, nullptr, 0 }, + { "refeed-mode", 1, nullptr, 0 }, { "response-threads", 1, nullptr, 0 }, { "rpc-events-before-wakeup", 1, nullptr, 0 }, { "rpc-network-threads", 1, nullptr, 0 }, @@ -442,6 +545,7 @@ App::get_options() LONGOPT_MODE, LONGOPT_NODES_PER_GROUP, LONGOPT_REDUNDANCY, + LONGOPT_REFEED, LONGOPT_RESPONSE_THREADS, LONGOPT_RPC_EVENTS_BEFORE_WAKEUP, LONGOPT_RPC_NETWORK_THREADS, @@ -510,6 +614,12 @@ App::get_options() case LONGOPT_REDUNDANCY: _bm_params.set_redundancy(atoi(opt_argument)); break; + case LONGOPT_REFEED: + _bm_params.set_refeed_mode(get_refeed_mode(opt_argument)); + if (_bm_params.get_refeed_mode() == ReFeedMode::BAD) { + std::cerr << "Unknown refeed-mode name " << opt_argument << std::endl; + } + break; case LONGOPT_RESPONSE_THREADS: _bm_params.set_response_threads(atoi(opt_argument)); break; diff --git a/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h index 69562d42276..deccb2520f5 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h @@ -7,20 +7,21 @@ namespace search::bmcluster { /* - * Class used to calculate average value of samples. + * Class used to calculate average feed rate. */ class AvgSampler { private: - double _total; - size_t _samples; + uint64_t _ops; + double _elapsed; public: - AvgSampler() : _total(0), _samples(0) {} - void sample(double val) { - _total += val; - ++_samples; + AvgSampler() : _ops(0), _elapsed(0.0) {} + void sample(uint64_t ops, double elapsed) { + _ops += ops; + _elapsed += elapsed; } - double avg() const { return _total / (double)_samples; } + double avg() const { return valid() ? (_ops / _elapsed) : 0.0; } + bool valid() const { return _elapsed != 0.0; } }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp index 55160f5d561..0e659e45136 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp @@ -37,7 +37,8 @@ BmFeeder::BmFeeder(std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& _feed_handler(feed_handler), _executor(executor), _all_fields(document::AllFields::NAME), - _use_timestamp(!_feed_handler.manages_timestamp()) + _use_timestamp(!_feed_handler.manages_timestamp()), + _stop(false) { } @@ -91,7 +92,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, } -void +uint32_t BmFeeder::feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { @@ -99,11 +100,14 @@ BmFeeder::feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostre PendingTracker pending_tracker(max_pending); _feed_handler.attach_bucket_info_queue(pending_tracker); vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - for (uint32_t i = range.get_start(); i < range.get_end(); ++i) { + uint32_t op_count = 0; + for (uint32_t i = range.get_start(); i < range.get_end() && !_stop.load(std::memory_order_relaxed); ++i) { feed_operation(i, is, time_bias, pending_tracker); + ++op_count; } - assert(is.empty()); + assert(is.empty() || _stop.load(std::memory_order_relaxed)); pending_tracker.drain(); + return op_count; } void @@ -111,19 +115,37 @@ BmFeeder::run_feed_tasks(int pass, int64_t& time_bias, const std::vector<vespali { uint32_t old_errors = _feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); + std::atomic<uint32_t> atomic_op_count(0u); for (uint32_t i = 0; i < params.get_client_threads(); ++i) { auto range = params.get_range(i); - _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { feed_task(max_pending, range, serialized_feed, time_bias); })); + _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias, &atomic_op_count]() + { atomic_op_count += feed_task(max_pending, range, serialized_feed, time_bias); })); } _executor.sync(); + uint32_t op_count = atomic_op_count.load(std::memory_order_relaxed); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; uint32_t new_errors = _feed_handler.get_error_count() - old_errors; - double throughput = params.get_documents() / elapsed.count(); - sampler.sample(throughput); - LOG(info, "%sAsync: pass=%u, errors=%u, %ss/s: %8.2f", op_name.c_str(), pass, new_errors, op_name.c_str(), throughput); + double throughput = op_count / elapsed.count(); + sampler.sample(op_count, elapsed.count()); + LOG(info, "%sAsync: pass=%u, errors=%u, ops=%u of %u, %ss/s: %8.2f", op_name.c_str(), pass, new_errors, op_count, params.get_documents(), op_name.c_str(), throughput); time_bias += params.get_documents(); } +void +BmFeeder::stop() +{ + _stop.store(true, std::memory_order_release); +} + +void +BmFeeder::run_feed_tasks_loop(int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, const vespalib::string &op_name) +{ + AvgSampler sampler; + for (int pass = 0; !_stop.load(std::memory_order_relaxed); ++pass) { + run_feed_tasks(pass, time_bias, serialized_feed_v, params, sampler, op_name); + } + LOG(info, "%sAsync: AVG %s/s: %8.2f", op_name.c_str(), op_name.c_str(), sampler.avg()); +} + } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h index 1001f572bf2..c8236e7222b 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h @@ -4,6 +4,7 @@ #include <vespa/document/bucket/bucketspace.h> #include <memory> +#include <atomic> namespace document { @@ -36,13 +37,16 @@ class BmFeeder { vespalib::ThreadStackExecutor& _executor; vespalib::string _all_fields; bool _use_timestamp; + std::atomic<bool> _stop; public: BmFeeder(std::shared_ptr<const document::DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor); ~BmFeeder(); void feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, PendingTracker& tracker); - void feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + uint32_t feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); void run_feed_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler, const vespalib::string& op_name); IBmFeedHandler& get_feed_handler() const { return _feed_handler; } + void stop(); + void run_feed_tasks_loop(int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, const vespalib::string &op_name); }; } |