diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-03 20:21:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-03 20:21:33 +0000 |
commit | 3649e82b9df2b461981274be42419c35dd955913 (patch) | |
tree | a8ac75752176e4c33b0994cb2f24ed951f7eeafe | |
parent | 5d8748786cd3db3b4cdc983c83bf65005dcdd149 (diff) |
Add maxpending which by default is 1000 to control max inflight.
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 26e07e7901a..2394f918c54 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -237,7 +237,7 @@ public: _num_buckets((num_buckets_in / _threads) * _threads) { } - uint64_t operator()(uint32_t i) { + uint64_t operator()(uint32_t i) const { return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; } }; @@ -265,6 +265,7 @@ class BMParams { uint32_t _remove_passes; uint32_t _rpc_network_threads; uint32_t _response_threads; + uint32_t _max_pending; bool _enable_distributor; bool _enable_service_layer; bool _use_document_api; @@ -284,6 +285,7 @@ public: _remove_passes(2), _rpc_network_threads(1), // Same default as in stor-communicationmanager.def _response_threads(2), // Same default as in stor-filestor.def + _max_pending(1000), _enable_distributor(false), _enable_service_layer(false), _use_document_api(false), @@ -296,6 +298,7 @@ public: return BMRange(get_start(thread_id), get_start(thread_id + 1)); } uint32_t get_documents() const { return _documents; } + uint32_t get_max_pending() const { return _max_pending; } uint32_t get_client_threads() const { return _client_threads; } const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } uint32_t get_put_passes() const { return _put_passes; } @@ -309,6 +312,7 @@ public: bool get_use_storage_chain() const { return _use_storage_chain; } bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; } void set_documents(uint32_t documents_in) { _documents = documents_in; } + void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } @@ -318,7 +322,7 @@ public: void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } - void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; } + void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in; } void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; } void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; } @@ -639,7 +643,7 @@ struct PersistenceProviderFixture { std::unique_ptr<storage::DistributorProcess> _distributor; std::unique_ptr<BmMessageBus> _message_bus; - PersistenceProviderFixture(const BMParams& params); + explicit PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); void create_document_db(const BMParams & params); uint32_t num_buckets() const { return (1u << _bucket_bits); } @@ -998,10 +1002,10 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st } void -put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(1000); + feedbm::PendingTracker pending_tracker(max_pending); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; @@ -1039,8 +1043,8 @@ run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() - { put_async_task(f, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { put_async_task(f, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -1067,10 +1071,10 @@ make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu } void -update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(100); + feedbm::PendingTracker pending_tracker(max_pending); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; @@ -1094,8 +1098,8 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() - { update_async_task(f, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { update_async_task(f, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -1123,10 +1127,10 @@ make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu } void -remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(100); + feedbm::PendingTracker pending_tracker(max_pending); vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; @@ -1149,8 +1153,8 @@ run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() - { remove_async_task(f, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { remove_async_task(f, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -1298,6 +1302,7 @@ App::get_options() { "client-threads", 1, nullptr, 0 }, { "indexing-sequencer", 1, nullptr, 0 }, { "documents", 1, nullptr, 0 }, + { "max-pending", 1, nullptr, 0 }, { "put-passes", 1, nullptr, 0 }, { "update-passes", 1, nullptr, 0 }, { "remove-passes", 1, nullptr, 0 }, @@ -1313,6 +1318,7 @@ App::get_options() enum longopts_enum { LONGOPT_CLIENT_THREADS, LONGOPT_INDEXING_SEQUENCER, + LONGOPT_MAX_PENDING, LONGOPT_DOCUMENTS, LONGOPT_PUT_PASSES, LONGOPT_UPDATE_PASSES, @@ -1341,6 +1347,9 @@ App::get_options() case LONGOPT_DOCUMENTS: _bm_params.set_documents(atoi(opt_argument)); break; + case LONGOPT_MAX_PENDING: + _bm_params.set_max_pending(atoi(opt_argument)); + break; case LONGOPT_PUT_PASSES: _bm_params.set_put_passes(atoi(opt_argument)); break; |