diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-03 23:05:08 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-03 23:05:08 +0200 |
commit | 796064ded3abf1afd8a44606ccf71043a03e9cac (patch) | |
tree | d9fd8ab608343f3d907b904b5ad7ac7d2f73b582 | |
parent | 3bed313ade23e3ce03e65b8d78f14cb08f854bfc (diff) | |
parent | c93a6fa688f5f6d4742f49153be9680f78481a65 (diff) |
Merge pull request #14697 from vespa-engine/balder/add-max-pending
Add maxpending which by default is 1000 to control max inflight.
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | 90 |
1 files changed, 50 insertions, 40 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 26e07e7901a..ba6c3b72d1e 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -237,7 +237,7 @@ public: _num_buckets((num_buckets_in / _threads) * _threads) { } - uint64_t operator()(uint32_t i) { + uint64_t operator()(uint32_t i) const { return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; } }; @@ -265,6 +265,7 @@ class BMParams { uint32_t _remove_passes; uint32_t _rpc_network_threads; uint32_t _response_threads; + uint32_t _max_pending; bool _enable_distributor; bool _enable_service_layer; bool _use_document_api; @@ -284,6 +285,7 @@ public: _remove_passes(2), _rpc_network_threads(1), // Same default as in stor-communicationmanager.def _response_threads(2), // Same default as in stor-filestor.def + _max_pending(1000), _enable_distributor(false), _enable_service_layer(false), _use_document_api(false), @@ -296,6 +298,7 @@ public: return BMRange(get_start(thread_id), get_start(thread_id + 1)); } uint32_t get_documents() const { return _documents; } + uint32_t get_max_pending() const { return _max_pending; } uint32_t get_client_threads() const { return _client_threads; } const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } uint32_t get_put_passes() const { return _put_passes; } @@ -309,6 +312,7 @@ public: bool get_use_storage_chain() const { return _use_storage_chain; } bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; } void set_documents(uint32_t documents_in) { _documents = documents_in; } + void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } @@ -318,7 +322,7 @@ public: void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } - void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; } + void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in; } void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; } void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; } @@ -639,7 +643,7 @@ struct PersistenceProviderFixture { std::unique_ptr<storage::DistributorProcess> _distributor; std::unique_ptr<BmMessageBus> _message_bus; - PersistenceProviderFixture(const BMParams& params); + explicit PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); void create_document_db(const BMParams & params); uint32_t num_buckets() const { return (1u << _bucket_bits); } @@ -998,10 +1002,10 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st } void -put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(1000); + feedbm::PendingTracker pending_tracker(max_pending); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; @@ -1039,8 +1043,8 @@ run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() - { put_async_task(f, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { put_async_task(f, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -1067,10 +1071,10 @@ make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu } void -update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(100); + feedbm::PendingTracker pending_tracker(max_pending); auto &repo = *f._repo; vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; @@ -1094,8 +1098,8 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() - { update_async_task(f, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { update_async_task(f, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -1123,10 +1127,10 @@ make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu } void -remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(100); + feedbm::PendingTracker pending_tracker(max_pending); vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); BucketId bucket_id; auto bucket_space = f._bucket_space; @@ -1149,8 +1153,8 @@ run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() - { remove_async_task(f, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { remove_async_task(f, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -1274,6 +1278,7 @@ App::usage() "vespa-feed-bm\n" "[--client-threads threads]\n" "[--indexing-sequencer [latency,throughput,adaptive]]\n" + "[--max-pending max-pending]\n" "[--documents documents]\n" "[--put-passes put-passes]\n" "[--update-passes update-passes]\n" @@ -1296,35 +1301,37 @@ App::get_options() int long_opt_index = 0; static struct option long_opts[] = { { "client-threads", 1, nullptr, 0 }, - { "indexing-sequencer", 1, nullptr, 0 }, { "documents", 1, nullptr, 0 }, + { "enable-distributor", 0, nullptr, 0 }, + { "enable-service-layer", 0, nullptr, 0 }, + { "indexing-sequencer", 1, nullptr, 0 }, + { "max-pending", 1, nullptr, 0 }, { "put-passes", 1, nullptr, 0 }, { "update-passes", 1, nullptr, 0 }, { "remove-passes", 1, nullptr, 0 }, - { "rpc-network-threads", 1, nullptr, 0 }, { "response-threads", 1, nullptr, 0 }, - { "enable-distributor", 0, nullptr, 0 }, - { "enable-service-layer", 0, nullptr, 0 }, + { "rpc-network-threads", 1, nullptr, 0 }, { "use-document-api", 0, nullptr, 0 }, + { "use-legacy-bucket-db", 0, nullptr, 0 }, { "use-message-bus", 0, nullptr, 0 }, - { "use-storage-chain", 0, nullptr, 0 }, - { "use-legacy-bucket-db", 0, nullptr, 0 } + { "use-storage-chain", 0, nullptr, 0 } }; enum longopts_enum { LONGOPT_CLIENT_THREADS, - LONGOPT_INDEXING_SEQUENCER, LONGOPT_DOCUMENTS, + LONGOPT_ENABLE_DISTRIBUTOR, + LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_INDEXING_SEQUENCER, + LONGOPT_MAX_PENDING, LONGOPT_PUT_PASSES, LONGOPT_UPDATE_PASSES, LONGOPT_REMOVE_PASSES, - LONGOPT_RPC_NETWORK_THREADS, LONGOPT_RESPONSE_THREADS, - LONGOPT_ENABLE_DISTRIBUTOR, - LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_RPC_NETWORK_THREADS, LONGOPT_USE_DOCUMENT_API, + LONGOPT_USE_LEGACY_BUCKET_DB, LONGOPT_USE_MESSAGE_BUS, - LONGOPT_USE_STORAGE_CHAIN, - LONGOPT_USE_LEGACY_BUCKET_DB + LONGOPT_USE_STORAGE_CHAIN }; int opt_index = 1; resetOptIndex(opt_index); @@ -1335,11 +1342,20 @@ App::get_options() case LONGOPT_CLIENT_THREADS: _bm_params.set_client_threads(atoi(opt_argument)); break; + case LONGOPT_DOCUMENTS: + _bm_params.set_documents(atoi(opt_argument)); + break; + case LONGOPT_ENABLE_DISTRIBUTOR: + _bm_params.set_enable_distributor(true); + break; + case LONGOPT_ENABLE_SERVICE_LAYER: + _bm_params.set_enable_service_layer(true); + break; case LONGOPT_INDEXING_SEQUENCER: _bm_params.set_indexing_sequencer(opt_argument); break; - case LONGOPT_DOCUMENTS: - _bm_params.set_documents(atoi(opt_argument)); + case LONGOPT_MAX_PENDING: + _bm_params.set_max_pending(atoi(opt_argument)); break; case LONGOPT_PUT_PASSES: _bm_params.set_put_passes(atoi(opt_argument)); @@ -1350,30 +1366,24 @@ App::get_options() case LONGOPT_REMOVE_PASSES: _bm_params.set_remove_passes(atoi(opt_argument)); break; - case LONGOPT_RPC_NETWORK_THREADS: - _bm_params.set_rpc_network_threads(atoi(opt_argument)); - break; case LONGOPT_RESPONSE_THREADS: _bm_params.set_response_threads(atoi(opt_argument)); break; - case LONGOPT_ENABLE_DISTRIBUTOR: - _bm_params.set_enable_distributor(true); - break; - case LONGOPT_ENABLE_SERVICE_LAYER: - _bm_params.set_enable_service_layer(true); + case LONGOPT_RPC_NETWORK_THREADS: + _bm_params.set_rpc_network_threads(atoi(opt_argument)); break; case LONGOPT_USE_DOCUMENT_API: _bm_params.set_use_document_api(true); break; + case LONGOPT_USE_LEGACY_BUCKET_DB: + _bm_params.set_use_legacy_bucket_db(true); + break; case LONGOPT_USE_MESSAGE_BUS: _bm_params.set_use_message_bus(true); break; case LONGOPT_USE_STORAGE_CHAIN: _bm_params.set_use_storage_chain(true); break; - case LONGOPT_USE_LEGACY_BUCKET_DB: - _bm_params.set_use_legacy_bucket_db(true); - break; default: return false; } |