aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-03 20:21:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-03 20:21:33 +0000
commit3649e82b9df2b461981274be42419c35dd955913 (patch)
treea8ac75752176e4c33b0994cb2f24ed951f7eeafe
parent5d8748786cd3db3b4cdc983c83bf65005dcdd149 (diff)
Add maxpending which by default is 1000 to control max inflight.
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp39
1 files changed, 24 insertions, 15 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 26e07e7901a..2394f918c54 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -237,7 +237,7 @@ public:
_num_buckets((num_buckets_in / _threads) * _threads)
{
}
- uint64_t operator()(uint32_t i) {
+ uint64_t operator()(uint32_t i) const {
return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets;
}
};
@@ -265,6 +265,7 @@ class BMParams {
uint32_t _remove_passes;
uint32_t _rpc_network_threads;
uint32_t _response_threads;
+ uint32_t _max_pending;
bool _enable_distributor;
bool _enable_service_layer;
bool _use_document_api;
@@ -284,6 +285,7 @@ public:
_remove_passes(2),
_rpc_network_threads(1), // Same default as in stor-communicationmanager.def
_response_threads(2), // Same default as in stor-filestor.def
+ _max_pending(1000),
_enable_distributor(false),
_enable_service_layer(false),
_use_document_api(false),
@@ -296,6 +298,7 @@ public:
return BMRange(get_start(thread_id), get_start(thread_id + 1));
}
uint32_t get_documents() const { return _documents; }
+ uint32_t get_max_pending() const { return _max_pending; }
uint32_t get_client_threads() const { return _client_threads; }
const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
uint32_t get_put_passes() const { return _put_passes; }
@@ -309,6 +312,7 @@ public:
bool get_use_storage_chain() const { return _use_storage_chain; }
bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
+ void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
@@ -318,7 +322,7 @@ public:
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; }
void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
- void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; }
+ void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in; }
void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; }
void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; }
@@ -639,7 +643,7 @@ struct PersistenceProviderFixture {
std::unique_ptr<storage::DistributorProcess> _distributor;
std::unique_ptr<BmMessageBus> _message_bus;
- PersistenceProviderFixture(const BMParams& params);
+ explicit PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
void create_document_db(const BMParams & params);
uint32_t num_buckets() const { return (1u << _bucket_bits); }
@@ -998,10 +1002,10 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st
}
void
-put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(1000);
+ feedbm::PendingTracker pending_tracker(max_pending);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
@@ -1039,8 +1043,8 @@ run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { put_async_task(f, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { put_async_task(f, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -1067,10 +1071,10 @@ make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu
}
void
-update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(100);
+ feedbm::PendingTracker pending_tracker(max_pending);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
@@ -1094,8 +1098,8 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { update_async_task(f, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { update_async_task(f, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -1123,10 +1127,10 @@ make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu
}
void
-remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(100);
+ feedbm::PendingTracker pending_tracker(max_pending);
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
@@ -1149,8 +1153,8 @@ run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { remove_async_task(f, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { remove_async_task(f, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -1298,6 +1302,7 @@ App::get_options()
{ "client-threads", 1, nullptr, 0 },
{ "indexing-sequencer", 1, nullptr, 0 },
{ "documents", 1, nullptr, 0 },
+ { "max-pending", 1, nullptr, 0 },
{ "put-passes", 1, nullptr, 0 },
{ "update-passes", 1, nullptr, 0 },
{ "remove-passes", 1, nullptr, 0 },
@@ -1313,6 +1318,7 @@ App::get_options()
enum longopts_enum {
LONGOPT_CLIENT_THREADS,
LONGOPT_INDEXING_SEQUENCER,
+ LONGOPT_MAX_PENDING,
LONGOPT_DOCUMENTS,
LONGOPT_PUT_PASSES,
LONGOPT_UPDATE_PASSES,
@@ -1341,6 +1347,9 @@ App::get_options()
case LONGOPT_DOCUMENTS:
_bm_params.set_documents(atoi(opt_argument));
break;
+ case LONGOPT_MAX_PENDING:
+ _bm_params.set_max_pending(atoi(opt_argument));
+ break;
case LONGOPT_PUT_PASSES:
_bm_params.set_put_passes(atoi(opt_argument));
break;