diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-09-15 16:44:26 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-09-15 16:44:26 +0200 |
commit | ff46fe04c98a52962a57e5484dacd4901e20044d (patch) | |
tree | e01ebc8f223023713a8a82ae6e6ed970da384345 /searchcore | |
parent | ed29df6d44c73acd7efe5368b0e5e4c7a0720b80 (diff) |
Move portions of BMParams to BmFeedParams.
Move more feed functions to BmFeeder and BmFeed.
Diffstat (limited to 'searchcore')
10 files changed, 420 insertions, 264 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 8ed74d4e92f..451e9fdefbb 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -4,10 +4,13 @@ #include <vespa/document/repo/document_type_repo_factory.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/fastos/app.h> +#include <vespa/searchcore/bmcluster/avg_sampler.h> #include <vespa/searchcore/bmcluster/bm_cluster.h> #include <vespa/searchcore/bmcluster/bm_cluster_controller.h> #include <vespa/searchcore/bmcluster/bm_cluster_params.h> #include <vespa/searchcore/bmcluster/bm_feed.h> +#include <vespa/searchcore/bmcluster/bm_feeder.h> +#include <vespa/searchcore/bmcluster/bm_feed_params.h> #include <vespa/searchcore/bmcluster/bm_node.h> #include <vespa/searchcore/bmcluster/bm_range.h> #include <vespa/searchcore/bmcluster/bucket_selector.h> @@ -33,11 +36,14 @@ using document::DocumentTypeRepo; using document::DocumentTypeRepoFactory; using document::DocumenttypesConfig; using document::DocumenttypesConfigBuilder; +using search::bmcluster::AvgSampler; using search::bmcluster::BmClusterController; using search::bmcluster::IBmFeedHandler; using search::bmcluster::BmClusterParams; using search::bmcluster::BmCluster; using search::bmcluster::BmFeed; +using search::bmcluster::BmFeedParams; +using search::bmcluster::BmFeeder; using search::bmcluster::BmNode; using search::bmcluster::BmRange; using search::bmcluster::BucketSelector; @@ -58,41 +64,27 @@ std::shared_ptr<DocumenttypesConfig> make_document_types() { return std::make_shared<DocumenttypesConfig>(builder.config()); } -class BMParams : public BmClusterParams { - uint32_t _documents; - uint32_t _client_threads; +class BMParams : public BmClusterParams, + public BmFeedParams +{ uint32_t _get_passes; uint32_t _put_passes; uint32_t _update_passes; uint32_t _remove_passes; - uint32_t _max_pending; - uint32_t get_start(uint32_t thread_id) const { - return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads); - } public: BMParams() - : _documents(160000), - _client_threads(1), + : BmClusterParams(), + BmFeedParams(), _get_passes(0), _put_passes(2), _update_passes(1), - _remove_passes(2), - _max_pending(1000) + _remove_passes(2) { } - BmRange get_range(uint32_t thread_id) const { - return BmRange(get_start(thread_id), get_start(thread_id + 1)); - } - uint32_t get_documents() const { return _documents; } - uint32_t get_max_pending() const { return _max_pending; } - uint32_t get_client_threads() const { return _client_threads; } uint32_t get_get_passes() const { return _get_passes; } uint32_t get_put_passes() const { return _put_passes; } uint32_t get_update_passes() const { return _update_passes; } uint32_t get_remove_passes() const { return _remove_passes; } - void set_documents(uint32_t documents_in) { _documents = documents_in; } - void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } - void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } @@ -106,16 +98,7 @@ BMParams::check() const if (!BmClusterParams::check()) { return false; } - if (_client_threads < 1) { - std::cerr << "Too few client threads: " << _client_threads << std::endl; - return false; - } - if (_client_threads > 256) { - std::cerr << "Too many client threads: " << _client_threads << std::endl; - return false; - } - if (_documents < _client_threads) { - std::cerr << "Too few documents: " << _documents << std::endl; + if (!BmFeedParams::check()) { return false; } if (_put_passes < 1) { @@ -133,7 +116,6 @@ struct PersistenceProviderFixture { std::shared_ptr<const DocumentTypeRepo> _repo; std::unique_ptr<BmCluster> _bm_cluster; BmFeed _feed; - IBmFeedHandler* _feed_handler; explicit PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); @@ -143,155 +125,27 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) : _document_types(make_document_types()), _repo(document::DocumentTypeRepoFactory::make(*_document_types)), _bm_cluster(std::make_unique<BmCluster>(base_dir, base_port, params, _document_types, _repo)), - _feed(_repo), - _feed_handler(nullptr) + _feed(_repo) { _bm_cluster->make_nodes(); } PersistenceProviderFixture::~PersistenceProviderFixture() = default; -std::vector<vespalib::nbostream> -make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) -{ - LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents()); - std::vector<vespalib::nbostream> serialized_feed_v; - auto start_time = std::chrono::steady_clock::now(); - serialized_feed_v.resize(bm_params.get_client_threads()); - for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { - auto range = bm_params.get_range(i); - BucketSelector bucket_selector(i, bm_params.get_client_threads(), num_buckets); - executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]() - { serialized_feed_v[i] = func(range, bucket_selector); })); - } - executor.sync(); - auto end_time = std::chrono::steady_clock::now(); - std::chrono::duration<double> elapsed = end_time - start_time; - LOG(info, "%8.2f %s data elements/s", bm_params.get_documents() / elapsed.count(), label.c_str()); - return serialized_feed_v; -} - -class AvgSampler { -private: - double _total; - size_t _samples; - -public: - AvgSampler() : _total(0), _samples(0) {} - void sample(double val) { - _total += val; - ++_samples; - } - double avg() const { return _total / (double)_samples; } -}; - -void -run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, - const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) -{ - auto& feed = f._feed; - auto& feed_handler = *f._feed_handler; - uint32_t old_errors = feed_handler.get_error_count(); - auto start_time = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { - auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { feed.put_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); - } - executor.sync(); - auto end_time = std::chrono::steady_clock::now(); - std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = feed_handler.get_error_count() - old_errors; - double throughput = bm_params.get_documents() / elapsed.count(); - sampler.sample(throughput); - LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput); - time_bias += bm_params.get_documents(); -} - -void -run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, - const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) -{ - auto& feed = f._feed; - auto& feed_handler = *f._feed_handler; - uint32_t old_errors = feed_handler.get_error_count(); - auto start_time = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { - auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { feed.update_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); - } - executor.sync(); - auto end_time = std::chrono::steady_clock::now(); - std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = feed_handler.get_error_count() - old_errors; - double throughput = bm_params.get_documents() / elapsed.count(); - sampler.sample(throughput); - LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput); - time_bias += bm_params.get_documents(); -} - -void -run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, - const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) -{ - auto& feed = f._feed; - auto& feed_handler = *f._feed_handler; - uint32_t old_errors = feed_handler.get_error_count(); - auto start_time = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { - auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() - { feed.get_async_task(feed_handler, max_pending, range, serialized_feed); })); - } - executor.sync(); - auto end_time = std::chrono::steady_clock::now(); - std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = feed_handler.get_error_count() - old_errors; - double throughput = bm_params.get_documents() / elapsed.count(); - sampler.sample(throughput); - LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput); -} - -void -run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, - const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) -{ - auto& feed = f._feed; - auto& feed_handler = *f._feed_handler; - uint32_t old_errors = feed_handler.get_error_count(); - auto start_time = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { - auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { feed.remove_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); - } - executor.sync(); - auto end_time = std::chrono::steady_clock::now(); - std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = feed_handler.get_error_count() - old_errors; - double throughput = bm_params.get_documents() / elapsed.count(); - sampler.sample(throughput); - LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput); - time_bias += bm_params.get_documents(); -} - void -benchmark_async_put(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, - int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params) +benchmark_async_put(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params) { AvgSampler sampler; LOG(info, "--------------------------------"); LOG(info, "putAsync: %u small documents, passes=%u", params.get_documents(), params.get_put_passes()); for (uint32_t pass = 0; pass < params.get_put_passes(); ++pass) { - run_put_async_tasks(f, executor, pass, time_bias, feed, params, sampler); + feeder.run_put_async_tasks(pass, time_bias, serialized_feed, params, sampler); } LOG(info, "putAsync: AVG puts/s: %8.2f", sampler.avg()); } void -benchmark_async_update(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, - int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params) +benchmark_async_update(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params) { if (params.get_update_passes() == 0) { return; @@ -300,14 +154,13 @@ benchmark_async_update(PersistenceProviderFixture& f, vespalib::ThreadStackExecu LOG(info, "--------------------------------"); LOG(info, "updateAsync: %u small documents, passes=%u", params.get_documents(), params.get_update_passes()); for (uint32_t pass = 0; pass < params.get_update_passes(); ++pass) { - run_update_async_tasks(f, executor, pass, time_bias, feed, params, sampler); + feeder.run_update_async_tasks(pass, time_bias, serialized_feed, params, sampler); } LOG(info, "updateAsync: AVG updates/s: %8.2f", sampler.avg()); } void -benchmark_async_get(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, - const std::vector<vespalib::nbostream>& feed, const BMParams& params) +benchmark_async_get(BmFeeder& feeder, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params) { if (params.get_get_passes() == 0) { return; @@ -316,14 +169,13 @@ benchmark_async_get(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor LOG(info, "getAsync: %u small documents, passes=%u", params.get_documents(), params.get_get_passes()); AvgSampler sampler; for (uint32_t pass = 0; pass < params.get_get_passes(); ++pass) { - run_get_async_tasks(f, executor, pass, feed, params, sampler); + feeder.run_get_async_tasks(pass, serialized_feed, params, sampler); } LOG(info, "getAsync: AVG gets/s: %8.2f", sampler.avg()); } void -benchmark_async_remove(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, - int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params) +benchmark_async_remove(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params) { if (params.get_remove_passes() == 0) { return; @@ -332,32 +184,31 @@ benchmark_async_remove(PersistenceProviderFixture& f, vespalib::ThreadStackExecu LOG(info, "removeAsync: %u small documents, passes=%u", params.get_documents(), params.get_remove_passes()); AvgSampler sampler; for (uint32_t pass = 0; pass < params.get_remove_passes(); ++pass) { - run_remove_async_tasks(f, executor, pass, time_bias, feed, params, sampler); + feeder.run_remove_async_tasks(pass, time_bias, serialized_feed, params, sampler); } LOG(info, "removeAsync: AVG removes/s: %8.2f", sampler.avg()); } -void benchmark_async_spi(const BMParams &bm_params) +void benchmark_async(const BMParams &bm_params) { vespalib::rmdir(base_dir, true); PersistenceProviderFixture f(bm_params); auto& cluster = *f._bm_cluster; cluster.start(f._feed); - f._feed_handler = cluster.get_feed_handler(); vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki); + BmFeeder feeder(f._repo, *cluster.get_feed_handler(), executor); auto& feed = f._feed; - auto put_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put"); - auto update_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update"); - auto remove_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove"); + auto put_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put"); + auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update"); + auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove"); int64_t time_bias = 1; - LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str()); - benchmark_async_put(f, executor, time_bias, put_feed, bm_params); - benchmark_async_update(f, executor, time_bias, update_feed, bm_params); - benchmark_async_get(f, executor, remove_feed, bm_params); - benchmark_async_remove(f, executor, time_bias, remove_feed, bm_params); + LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str()); + benchmark_async_put(feeder, time_bias, put_feed, bm_params); + benchmark_async_update(feeder, time_bias, update_feed, bm_params); + benchmark_async_get(feeder, remove_feed, bm_params); + benchmark_async_remove(feeder, time_bias, remove_feed, bm_params); LOG(info, "--------------------------------"); - f._feed_handler = nullptr; cluster.stop(); } @@ -562,7 +413,7 @@ App::Main() usage(); return 1; } - benchmark_async_spi(_bm_params); + benchmark_async(_bm_params); return 0; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index 17f9bff38d3..184ada0ab38 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -6,6 +6,8 @@ vespa_add_library(searchcore_bmcluster STATIC bm_cluster_params.cpp bm_distribution.cpp bm_feed.cpp + bm_feeder.cpp + bm_feed_params.cpp bm_message_bus.cpp bm_message_bus_routes.cpp bm_node.cpp diff --git a/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h new file mode 100644 index 00000000000..69562d42276 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstddef> + +namespace search::bmcluster { + +/* + * Class used to calculate average value of samples. + */ +class AvgSampler { +private: + double _total; + size_t _samples; + +public: + AvgSampler() : _total(0), _samples(0) {} + void sample(double val) { + _total += val; + ++_samples; + } + double avg() const { return _total / (double)_samples; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp index e082f2b96a1..1a822dc1769 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bm_feed.h" +#include "avg_sampler.h" +#include "bm_feed_params.h" #include "bm_range.h" #include "bucket_selector.h" #include "pending_tracker.h" @@ -15,8 +17,11 @@ #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/update/documentupdate.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <cassert> +#include <chrono> #include <vespa/log/log.h> LOG_SETUP(".bmcluster.bm_feed"); @@ -29,6 +34,7 @@ using document::DocumentTypeRepo; using document::DocumentUpdate; using document::IntFieldValue; using document::FieldUpdate; +using vespalib::makeLambdaTask; namespace search::bmcluster { @@ -115,81 +121,24 @@ BmFeed::make_remove_feed(BmRange range, BucketSelector bucket_selector) } -void -BmFeed::put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +std::vector<vespalib::nbostream> +BmFeed::make_feed(vespalib::ThreadStackExecutor& executor, const BmFeedParams& params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) { - LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - PendingTracker pending_tracker(max_pending); - feed_handler.attach_bucket_info_queue(pending_tracker); - auto &repo = *_repo; - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - document::BucketId bucket_id; - bool use_timestamp = !feed_handler.manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(_bucket_space, bucket_id); - auto document = std::make_unique<Document>(repo, is); - feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - -void -BmFeed::update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - PendingTracker pending_tracker(max_pending); - feed_handler.attach_bucket_info_queue(pending_tracker); - auto &repo = *_repo; - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - document::BucketId bucket_id; - bool use_timestamp = !feed_handler.manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(_bucket_space, bucket_id); - auto document_update = DocumentUpdate::createHEAD(repo, is); - feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - -void -BmFeed::get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed) -{ - LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end()); - search::bmcluster::PendingTracker pending_tracker(max_pending); - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - document::BucketId bucket_id; - vespalib::string all_fields(document::AllFields::NAME); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(_bucket_space, bucket_id); - DocumentId document_id(is); - feed_handler.get(bucket, all_fields, document_id, pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - -void -BmFeed::remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - search::bmcluster::PendingTracker pending_tracker(max_pending); - feed_handler.attach_bucket_info_queue(pending_tracker); - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - document::BucketId bucket_id; - bool use_timestamp = !feed_handler.manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(_bucket_space, bucket_id); - DocumentId document_id(is); - feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker); + LOG(info, "make_feed %s %u small documents", label.c_str(), params.get_documents()); + std::vector<vespalib::nbostream> serialized_feed_v; + auto start_time = std::chrono::steady_clock::now(); + serialized_feed_v.resize(params.get_client_threads()); + for (uint32_t i = 0; i < params.get_client_threads(); ++i) { + auto range = params.get_range(i); + BucketSelector bucket_selector(i, params.get_client_threads(), num_buckets); + executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]() + { serialized_feed_v[i] = func(range, bucket_selector); })); } - assert(is.empty()); - pending_tracker.drain(); + executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + LOG(info, "%8.2f %s data elements/s", params.get_documents() / elapsed.count(), label.c_str()); + return serialized_feed_v; } } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h index a6afe7b10d9..f8596723634 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h @@ -4,7 +4,6 @@ #include <vespa/document/base/documentid.h> #include <vespa/document/bucket/bucketspace.h> -#include <vespa/document/bucket/bucketid.h> #include <vespa/document/bucket/bucket.h> namespace document { @@ -17,13 +16,18 @@ class Field; } -namespace vespalib { class nbostream; } +namespace vespalib { + +class ThreadStackExecutor; +class nbostream; + +} namespace search::bmcluster { +class BmFeedParams; class BmRange; class BucketSelector; -class IBmFeedHandler; /* * Class to generate synthetic feed of documents. @@ -35,7 +39,7 @@ class BmFeed { uint32_t _bucket_bits; document::BucketSpace _bucket_space; public: - + BmFeed(std::shared_ptr<const document::DocumentTypeRepo> document_types); ~BmFeed(); uint32_t num_buckets() const { return (1u << _bucket_bits); } @@ -48,10 +52,7 @@ public: vespalib::nbostream make_put_feed(BmRange range, BucketSelector bucket_selector); vespalib::nbostream make_update_feed(BmRange range, BucketSelector bucket_selector); vespalib::nbostream make_remove_feed(BmRange range, BucketSelector bucket_selector); - void put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); - void update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); - void get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed); - void remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + std::vector<vespalib::nbostream> make_feed(vespalib::ThreadStackExecutor& executor, const BmFeedParams& bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string& label); }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp new file mode 100644 index 00000000000..773232556e7 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp @@ -0,0 +1,42 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_feed_params.h" +#include "bm_range.h" +#include <iostream> + +namespace search::bmcluster { + +BmFeedParams::BmFeedParams() + : _client_threads(1), + _documents(160000), + _max_pending(1000) +{ +} + +BmFeedParams::~BmFeedParams() = default; + +BmRange +BmFeedParams::get_range(uint32_t thread_id) const +{ + return BmRange(get_start(thread_id), get_start(thread_id + 1)); +} + +bool +BmFeedParams::check() const +{ + if (_client_threads < 1) { + std::cerr << "Too few client threads: " << _client_threads << std::endl; + return false; + } + if (_client_threads > 256) { + std::cerr << "Too many client threads: " << _client_threads << std::endl; + return false; + } + if (_documents < _client_threads) { + std::cerr << "Too few documents: " << _documents << std::endl; + return false; + } + return true; +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h new file mode 100644 index 00000000000..bc992422982 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> +#include <vespa/vespalib/stllike/string.h> + +namespace search::bmcluster { + +class BmRange; + +/* + * Parameters for generating synthetic feed of documents and for + * feeding them to the cluster. + */ +class BmFeedParams +{ + uint32_t _client_threads; + uint32_t _documents; + uint32_t _max_pending; + uint32_t get_start(uint32_t thread_id) const { + return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads); + } +public: + BmFeedParams(); + ~BmFeedParams(); + uint32_t get_client_threads() const { return _client_threads; } + uint32_t get_documents() const { return _documents; } + uint32_t get_max_pending() const { return _max_pending; } + BmRange get_range(uint32_t thread_id) const; + void set_documents(uint32_t documents_in) { _documents = documents_in; } + void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } + void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } + bool check() const; +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp new file mode 100644 index 00000000000..1be39f551b9 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp @@ -0,0 +1,199 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_feeder.h" +#include "avg_sampler.h" +#include "bm_feed_params.h" +#include "bm_range.h" +#include "bucket_selector.h" +#include "pending_tracker.h" +#include "i_bm_feed_handler.h" +#include <vespa/document/base/documentid.h> +#include <vespa/document/bucket/bucket.h> +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <cassert> +#include <chrono> + +#include <vespa/log/log.h> +LOG_SETUP(".bmcluster.bm_feeder"); + +using document::Document; +using document::DocumentId; +using document::DocumentTypeRepo; +using document::DocumentUpdate; +using vespalib::makeLambdaTask; + +namespace search::bmcluster { + +BmFeeder::BmFeeder(std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor) + : _repo(std::move(repo)), + _bucket_space(document::test::makeBucketSpace("test")), + _feed_handler(feed_handler), + _executor(executor) +{ +} + +BmFeeder::~BmFeeder() = default; + +void +BmFeeder::put_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +{ + LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); + PendingTracker pending_tracker(max_pending); + _feed_handler.attach_bucket_info_queue(pending_tracker); + auto &repo = *_repo; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + bool use_timestamp = !_feed_handler.manages_timestamp(); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + auto document = std::make_unique<Document>(repo, is); + _feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeeder::update_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +{ + LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); + PendingTracker pending_tracker(max_pending); + _feed_handler.attach_bucket_info_queue(pending_tracker); + auto &repo = *_repo; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + bool use_timestamp = !_feed_handler.manages_timestamp(); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + auto document_update = DocumentUpdate::createHEAD(repo, is); + _feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeeder::get_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed) +{ + LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end()); + PendingTracker pending_tracker(max_pending); + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + vespalib::string all_fields(document::AllFields::NAME); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + DocumentId document_id(is); + _feed_handler.get(bucket, all_fields, document_id, pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeeder::remove_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +{ + LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); + PendingTracker pending_tracker(max_pending); + _feed_handler.attach_bucket_info_queue(pending_tracker); + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + bool use_timestamp = !_feed_handler.manages_timestamp(); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + DocumentId document_id(is); + _feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeeder::run_put_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler) +{ + uint32_t old_errors = _feed_handler.get_error_count(); + auto start_time = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < params.get_client_threads(); ++i) { + auto range = params.get_range(i); + _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { put_async_task(max_pending, range, serialized_feed, time_bias); })); + } + _executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + uint32_t new_errors = _feed_handler.get_error_count() - old_errors; + double throughput = params.get_documents() / elapsed.count(); + sampler.sample(throughput); + LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput); + time_bias += params.get_documents(); +} + +void +BmFeeder::run_update_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler) +{ + uint32_t old_errors = _feed_handler.get_error_count(); + auto start_time = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < params.get_client_threads(); ++i) { + auto range = params.get_range(i); + _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { update_async_task(max_pending, range, serialized_feed, time_bias); })); + } + _executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + uint32_t new_errors = _feed_handler.get_error_count() - old_errors; + double throughput = params.get_documents() / elapsed.count(); + sampler.sample(throughput); + LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput); + time_bias += params.get_documents(); +} + +void +BmFeeder::run_get_async_tasks(int pass, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler) +{ + uint32_t old_errors = _feed_handler.get_error_count(); + auto start_time = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < params.get_client_threads(); ++i) { + auto range = params.get_range(i); + _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() + { get_async_task(max_pending, range, serialized_feed); })); + } + _executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + uint32_t new_errors = _feed_handler.get_error_count() - old_errors; + double throughput = params.get_documents() / elapsed.count(); + sampler.sample(throughput); + LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput); +} + +void +BmFeeder::run_remove_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler) +{ + uint32_t old_errors = _feed_handler.get_error_count(); + auto start_time = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < params.get_client_threads(); ++i) { + auto range = params.get_range(i); + _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { remove_async_task(max_pending, range, serialized_feed, time_bias); })); + } + _executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + uint32_t new_errors = _feed_handler.get_error_count() - old_errors; + double throughput = params.get_documents() / elapsed.count(); + sampler.sample(throughput); + LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput); + time_bias += params.get_documents(); +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h new file mode 100644 index 00000000000..0e404f731c6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h @@ -0,0 +1,50 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/bucket/bucketspace.h> +#include <memory> + +namespace document { + +class DocumentTypeRepo; + +} + +namespace vespalib { + +class ThreadStackExecutor; +class nbostream; + +} + +namespace search::bmcluster { + +class AvgSampler; +class BmFeedParams; +class BmRange; +class IBmFeedHandler; + +/* + * Class to feed serialized feed operations to a feed handler. + */ +class BmFeeder { + std::shared_ptr<const document::DocumentTypeRepo> _repo; + document::BucketSpace _bucket_space; + IBmFeedHandler& _feed_handler; + vespalib::ThreadStackExecutor& _executor; +public: + BmFeeder(std::shared_ptr<const document::DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor); + ~BmFeeder(); + void put_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + void update_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + void get_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed); + void remove_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + void run_put_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler); + void run_update_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler); + void run_get_async_tasks(int pass, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler); + void run_remove_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler); + IBmFeedHandler& get_feed_handler() const { return _feed_handler; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp index 4de1f21b86f..0bf53a589d6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp @@ -34,4 +34,3 @@ BucketInfoQueue::get_bucket_info_loop() } } - |