diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-10-13 16:14:24 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-10-13 17:50:42 +0200 |
commit | a1d1b5078c979c936255e5f39757b6b0f36b2d8a (patch) | |
tree | bab3b5da82a7f62deb24a5735c608a3164f177ef /searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | |
parent | ff2afad2393bf61e4cadf1bde7bd678e92a5b435 (diff) |
Benchmark get operations.
Diffstat (limited to 'searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp')
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | 74 |
1 files changed, 72 insertions, 2 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 842f24cbd65..506881ec61e 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -26,6 +26,7 @@ #include <vespa/config-upgrading.h> #include <vespa/config/common/configcontext.h> #include <vespa/document/datatype/documenttype.h> +#include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/repo/configbuilder.h> #include <vespa/document/repo/document_type_repo_factory.h> @@ -121,6 +122,7 @@ using document::DocumentUpdate; using document::DocumenttypesConfig; using document::DocumenttypesConfigBuilder; using document::Field; +using document::FieldSetRepo; using document::FieldUpdate; using document::IntFieldValue; using document::test::makeBucketSpace; @@ -260,6 +262,7 @@ public: class BMParams { uint32_t _documents; uint32_t _client_threads; + uint32_t _get_passes; vespalib::string _indexing_sequencer; uint32_t _put_passes; uint32_t _update_passes; @@ -282,6 +285,7 @@ public: BMParams() : _documents(160000), _client_threads(1), + _get_passes(0), _indexing_sequencer(), _put_passes(2), _update_passes(1), @@ -305,6 +309,7 @@ public: uint32_t get_documents() const { return _documents; } uint32_t get_max_pending() const { return _max_pending; } uint32_t get_client_threads() const { return _client_threads; } + uint32_t get_get_passes() const { return _get_passes; } const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } uint32_t get_put_passes() const { return _put_passes; } uint32_t get_update_passes() const { return _update_passes; } @@ -321,6 +326,7 @@ public: void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } + void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; } void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } @@ -641,6 +647,7 @@ struct PersistenceProviderFixture { MyPersistenceEngineOwner _persistence_owner; MyResourceWriteFilter _write_filter; std::shared_ptr<PersistenceEngine> _persistence_engine; + std::unique_ptr<const FieldSetRepo> _field_set_repo; uint32_t _bucket_bits; MyServiceLayerConfig _service_layer_config; MyDistributorConfig _distributor_config; @@ -709,6 +716,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _persistence_owner(), _write_filter(), _persistence_engine(), + _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)), _bucket_bits(16), _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), @@ -733,7 +741,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _distributor_config.add_builders(_config_set); _rpc_client_config.add_builders(_config_set); _message_bus_config.add_builders(_config_set); - _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, params.get_skip_get_spi_bucket_info()); + _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info()); } PersistenceProviderFixture::~PersistenceProviderFixture() @@ -825,7 +833,7 @@ PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const void PersistenceProviderFixture::create_buckets() { - SpiBmFeedHandler feed_handler(*_persistence_engine, false); + SpiBmFeedHandler feed_handler(*_persistence_engine, *_field_set_repo, false); for (unsigned int i = 0; i < num_buckets(); ++i) { feed_handler.create_bucket(make_bucket(i)); } @@ -1128,6 +1136,45 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu time_bias += bm_params.get_documents(); } +void +get_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed) +{ + LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end()); + feedbm::PendingTracker pending_tracker(max_pending); + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + BucketId bucket_id; + vespalib::string all_fields(document::AllFields::NAME); + auto bucket_space = f._bucket_space; + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(bucket_space, bucket_id); + DocumentId document_id(is); + f._feed_handler->get(bucket, all_fields, document_id, pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, + const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) +{ + uint32_t old_errors = f._feed_handler->get_error_count(); + auto start_time = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { + auto range = bm_params.get_range(i); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() + { get_async_task(f, max_pending, range, serialized_feed); })); + } + executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + double throughput = bm_params.get_documents() / elapsed.count(); + sampler.sample(throughput); + LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput); +} + vespalib::nbostream make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { @@ -1214,6 +1261,22 @@ benchmark_async_update(PersistenceProviderFixture& f, vespalib::ThreadStackExecu } void +benchmark_async_get(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, + const std::vector<vespalib::nbostream>& feed, const BMParams& params) +{ + if (params.get_get_passes() == 0) { + return; + } + LOG(info, "--------------------------------"); + LOG(info, "getAsync: %u small documents, passes=%u", params.get_documents(), params.get_get_passes()); + AvgSampler sampler; + for (uint32_t pass = 0; pass < params.get_get_passes(); ++pass) { + run_get_async_tasks(f, executor, pass, feed, params, sampler); + } + LOG(info, "getAsync: AVG gets/s: %8.2f", sampler.avg()); +} + +void benchmark_async_remove(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params) { @@ -1258,6 +1321,7 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str()); benchmark_async_put(f, executor, time_bias, put_feed, bm_params); benchmark_async_update(f, executor, time_bias, update_feed, bm_params); + benchmark_async_get(f, executor, remove_feed, bm_params); benchmark_async_remove(f, executor, time_bias, remove_feed, bm_params); LOG(info, "--------------------------------"); @@ -1295,6 +1359,7 @@ App::usage() std::cerr << "vespa-feed-bm\n" "[--client-threads threads]\n" + "[--get-passes get-passes]\n" "[--indexing-sequencer [latency,throughput,adaptive]]\n" "[--max-pending max-pending]\n" "[--documents documents]\n" @@ -1324,6 +1389,7 @@ App::get_options() { "documents", 1, nullptr, 0 }, { "enable-distributor", 0, nullptr, 0 }, { "enable-service-layer", 0, nullptr, 0 }, + { "get-passes", 1, nullptr, 0 }, { "indexing-sequencer", 1, nullptr, 0 }, { "max-pending", 1, nullptr, 0 }, { "put-passes", 1, nullptr, 0 }, @@ -1343,6 +1409,7 @@ App::get_options() LONGOPT_DOCUMENTS, LONGOPT_ENABLE_DISTRIBUTOR, LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_GET_PASSES, LONGOPT_INDEXING_SEQUENCER, LONGOPT_MAX_PENDING, LONGOPT_PUT_PASSES, @@ -1375,6 +1442,9 @@ App::get_options() case LONGOPT_ENABLE_SERVICE_LAYER: _bm_params.set_enable_service_layer(true); break; + case LONGOPT_GET_PASSES: + _bm_params.set_get_passes(atoi(opt_argument)); + break; case LONGOPT_INDEXING_SEQUENCER: _bm_params.set_indexing_sequencer(opt_argument); break; |