aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-03 23:05:08 +0200
committerGitHub <noreply@github.com>2020-10-03 23:05:08 +0200
commit796064ded3abf1afd8a44606ccf71043a03e9cac (patch)
treed9fd8ab608343f3d907b904b5ad7ac7d2f73b582
parent3bed313ade23e3ce03e65b8d78f14cb08f854bfc (diff)
parentc93a6fa688f5f6d4742f49153be9680f78481a65 (diff)
Merge pull request #14697 from vespa-engine/balder/add-max-pending
Add maxpending which by default is 1000 to control max inflight.
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp90
1 files changed, 50 insertions, 40 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 26e07e7901a..ba6c3b72d1e 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -237,7 +237,7 @@ public:
_num_buckets((num_buckets_in / _threads) * _threads)
{
}
- uint64_t operator()(uint32_t i) {
+ uint64_t operator()(uint32_t i) const {
return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets;
}
};
@@ -265,6 +265,7 @@ class BMParams {
uint32_t _remove_passes;
uint32_t _rpc_network_threads;
uint32_t _response_threads;
+ uint32_t _max_pending;
bool _enable_distributor;
bool _enable_service_layer;
bool _use_document_api;
@@ -284,6 +285,7 @@ public:
_remove_passes(2),
_rpc_network_threads(1), // Same default as in stor-communicationmanager.def
_response_threads(2), // Same default as in stor-filestor.def
+ _max_pending(1000),
_enable_distributor(false),
_enable_service_layer(false),
_use_document_api(false),
@@ -296,6 +298,7 @@ public:
return BMRange(get_start(thread_id), get_start(thread_id + 1));
}
uint32_t get_documents() const { return _documents; }
+ uint32_t get_max_pending() const { return _max_pending; }
uint32_t get_client_threads() const { return _client_threads; }
const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
uint32_t get_put_passes() const { return _put_passes; }
@@ -309,6 +312,7 @@ public:
bool get_use_storage_chain() const { return _use_storage_chain; }
bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
+ void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
@@ -318,7 +322,7 @@ public:
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; }
void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
- void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; }
+ void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in; }
void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; }
void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; }
@@ -639,7 +643,7 @@ struct PersistenceProviderFixture {
std::unique_ptr<storage::DistributorProcess> _distributor;
std::unique_ptr<BmMessageBus> _message_bus;
- PersistenceProviderFixture(const BMParams& params);
+ explicit PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
void create_document_db(const BMParams & params);
uint32_t num_buckets() const { return (1u << _bucket_bits); }
@@ -998,10 +1002,10 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st
}
void
-put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(1000);
+ feedbm::PendingTracker pending_tracker(max_pending);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
@@ -1039,8 +1043,8 @@ run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { put_async_task(f, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { put_async_task(f, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -1067,10 +1071,10 @@ make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu
}
void
-update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(100);
+ feedbm::PendingTracker pending_tracker(max_pending);
auto &repo = *f._repo;
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
@@ -1094,8 +1098,8 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { update_async_task(f, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { update_async_task(f, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -1123,10 +1127,10 @@ make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bu
}
void
-remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(100);
+ feedbm::PendingTracker pending_tracker(max_pending);
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
BucketId bucket_id;
auto bucket_space = f._bucket_space;
@@ -1149,8 +1153,8 @@ run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { remove_async_task(f, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { remove_async_task(f, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -1274,6 +1278,7 @@ App::usage()
"vespa-feed-bm\n"
"[--client-threads threads]\n"
"[--indexing-sequencer [latency,throughput,adaptive]]\n"
+ "[--max-pending max-pending]\n"
"[--documents documents]\n"
"[--put-passes put-passes]\n"
"[--update-passes update-passes]\n"
@@ -1296,35 +1301,37 @@ App::get_options()
int long_opt_index = 0;
static struct option long_opts[] = {
{ "client-threads", 1, nullptr, 0 },
- { "indexing-sequencer", 1, nullptr, 0 },
{ "documents", 1, nullptr, 0 },
+ { "enable-distributor", 0, nullptr, 0 },
+ { "enable-service-layer", 0, nullptr, 0 },
+ { "indexing-sequencer", 1, nullptr, 0 },
+ { "max-pending", 1, nullptr, 0 },
{ "put-passes", 1, nullptr, 0 },
{ "update-passes", 1, nullptr, 0 },
{ "remove-passes", 1, nullptr, 0 },
- { "rpc-network-threads", 1, nullptr, 0 },
{ "response-threads", 1, nullptr, 0 },
- { "enable-distributor", 0, nullptr, 0 },
- { "enable-service-layer", 0, nullptr, 0 },
+ { "rpc-network-threads", 1, nullptr, 0 },
{ "use-document-api", 0, nullptr, 0 },
+ { "use-legacy-bucket-db", 0, nullptr, 0 },
{ "use-message-bus", 0, nullptr, 0 },
- { "use-storage-chain", 0, nullptr, 0 },
- { "use-legacy-bucket-db", 0, nullptr, 0 }
+ { "use-storage-chain", 0, nullptr, 0 }
};
enum longopts_enum {
LONGOPT_CLIENT_THREADS,
- LONGOPT_INDEXING_SEQUENCER,
LONGOPT_DOCUMENTS,
+ LONGOPT_ENABLE_DISTRIBUTOR,
+ LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_INDEXING_SEQUENCER,
+ LONGOPT_MAX_PENDING,
LONGOPT_PUT_PASSES,
LONGOPT_UPDATE_PASSES,
LONGOPT_REMOVE_PASSES,
- LONGOPT_RPC_NETWORK_THREADS,
LONGOPT_RESPONSE_THREADS,
- LONGOPT_ENABLE_DISTRIBUTOR,
- LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_RPC_NETWORK_THREADS,
LONGOPT_USE_DOCUMENT_API,
+ LONGOPT_USE_LEGACY_BUCKET_DB,
LONGOPT_USE_MESSAGE_BUS,
- LONGOPT_USE_STORAGE_CHAIN,
- LONGOPT_USE_LEGACY_BUCKET_DB
+ LONGOPT_USE_STORAGE_CHAIN
};
int opt_index = 1;
resetOptIndex(opt_index);
@@ -1335,11 +1342,20 @@ App::get_options()
case LONGOPT_CLIENT_THREADS:
_bm_params.set_client_threads(atoi(opt_argument));
break;
+ case LONGOPT_DOCUMENTS:
+ _bm_params.set_documents(atoi(opt_argument));
+ break;
+ case LONGOPT_ENABLE_DISTRIBUTOR:
+ _bm_params.set_enable_distributor(true);
+ break;
+ case LONGOPT_ENABLE_SERVICE_LAYER:
+ _bm_params.set_enable_service_layer(true);
+ break;
case LONGOPT_INDEXING_SEQUENCER:
_bm_params.set_indexing_sequencer(opt_argument);
break;
- case LONGOPT_DOCUMENTS:
- _bm_params.set_documents(atoi(opt_argument));
+ case LONGOPT_MAX_PENDING:
+ _bm_params.set_max_pending(atoi(opt_argument));
break;
case LONGOPT_PUT_PASSES:
_bm_params.set_put_passes(atoi(opt_argument));
@@ -1350,30 +1366,24 @@ App::get_options()
case LONGOPT_REMOVE_PASSES:
_bm_params.set_remove_passes(atoi(opt_argument));
break;
- case LONGOPT_RPC_NETWORK_THREADS:
- _bm_params.set_rpc_network_threads(atoi(opt_argument));
- break;
case LONGOPT_RESPONSE_THREADS:
_bm_params.set_response_threads(atoi(opt_argument));
break;
- case LONGOPT_ENABLE_DISTRIBUTOR:
- _bm_params.set_enable_distributor(true);
- break;
- case LONGOPT_ENABLE_SERVICE_LAYER:
- _bm_params.set_enable_service_layer(true);
+ case LONGOPT_RPC_NETWORK_THREADS:
+ _bm_params.set_rpc_network_threads(atoi(opt_argument));
break;
case LONGOPT_USE_DOCUMENT_API:
_bm_params.set_use_document_api(true);
break;
+ case LONGOPT_USE_LEGACY_BUCKET_DB:
+ _bm_params.set_use_legacy_bucket_db(true);
+ break;
case LONGOPT_USE_MESSAGE_BUS:
_bm_params.set_use_message_bus(true);
break;
case LONGOPT_USE_STORAGE_CHAIN:
_bm_params.set_use_storage_chain(true);
break;
- case LONGOPT_USE_LEGACY_BUCKET_DB:
- _bm_params.set_use_legacy_bucket_db(true);
- break;
default:
return false;
}