diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-18 14:27:03 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-18 14:27:03 +0200 |
commit | 63291d89df02bb2c6a0ff1ec95124b7d746410a1 (patch) | |
tree | 2efd013bcc5392d97a53f6ecb7c39c5a45eecba8 | |
parent | 6411525f438f153946af1cbabc0f1a33178c9bcf (diff) |
Add options to vespa spi feed benchmark.
-rw-r--r-- | searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp | 208 |
1 files changed, 168 insertions, 40 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp index d2c8e00aa81..6f52b58cb86 100644 --- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp +++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp @@ -34,6 +34,9 @@ #include <vespa/config-indexschema.h> #include <vespa/config-summary.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/fastos/app.h> +#include <getopt.h> +#include <iostream> #include <vespa/log/log.h> LOG_SETUP("vespa-spi-feed-bm"); @@ -213,6 +216,60 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result (void) resultHandler; } +class BMRange +{ + uint32_t _start; + uint32_t _end; +public: + BMRange(uint32_t start_in, uint32_t end_in) + : _start(start_in), + _end(end_in) + { + } + uint32_t get_start() const { return _start; } + uint32_t get_end() const { return _end; } +}; + +class BMParams { + uint32_t _documents; + uint32_t _threads; + uint32_t get_start(uint32_t thread_id) const { + return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads); + } +public: + BMParams() + : _documents(160000), + _threads(32) + { + } + BMRange get_range(uint32_t thread_id) const { + return BMRange(get_start(thread_id), get_start(thread_id + 1)); + } + uint32_t get_documents() const { return _documents; } + uint32_t get_threads() const { return _threads; } + void set_documents(uint32_t documents_in) { _documents = documents_in; } + void set_threads(uint32_t threads_in) { _threads = threads_in; } + bool check() const; +}; + +bool +BMParams::check() const +{ + if (_threads < 1) { + std::cerr << "Too few threads: " << _threads << std::endl; + return false; + } + if (_threads > 256) { + std::cerr << "Too many threads: " << _threads << std::endl; + return false; + } + if (_documents < _threads) { + std::cerr << "Too few documents: " << _documents << std::endl; + return false; + } + return true; +} + } @@ -241,17 +298,12 @@ struct PersistenceProviderFixture { std::shared_ptr<PersistenceEngine> _persistence_engine; storage::spi::Context _context; uint32_t _bucket_bits; - + PersistenceProviderFixture(); ~PersistenceProviderFixture(); void create_document_db(); uint32_t num_buckets() const { return (1u << _bucket_bits); } Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); } - BucketId get_bucket_id(const Document &document) const { - auto &id = document.getId(); - auto &gid = id.getGlobalId(); - return BucketId(_bucket_bits, gid.convertToBucketId().getRawId()).stripUnused(); - } DocumentId make_document_id(uint32_t i) const; std::unique_ptr<Document> make_document(uint32_t i) const; void create_buckets(); @@ -370,13 +422,13 @@ PersistenceProviderFixture::create_buckets() } void -put_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, int64_t time_bias) +put_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) { + LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); MyPendingTracker pending_tracker(100); auto &provider = *f._persistence_engine; auto &context = f._context; - uint32_t end = start + count; - for (unsigned int i = start; i < end; ++i) { + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { auto bucket = f.make_bucket(i); auto document = f.make_document(i); provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker)); @@ -385,31 +437,30 @@ put_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, in } void -run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias) +run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params) { - uint32_t tasks = 32; - uint32_t per_task = 5000; - LOG(info, "putAsync %u small documents, pass=%u", tasks * per_task, pass); + LOG(info, "putAsync %u small documents, pass=%u", bm_params.get_documents(), pass); auto start_time = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < tasks; ++i) { - executor.execute(makeLambdaTask([&f, i, per_task, time_bias]() - { put_async_task(f, i * per_task, per_task, time_bias); })); + for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { + auto range = bm_params.get_range(i); + executor.execute(makeLambdaTask([&f, range, time_bias]() + { put_async_task(f, range, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - LOG(info, "%8.2f puts/s for pass=%u", (tasks * per_task) / elapsed.count(), pass); - time_bias += tasks * per_task; + LOG(info, "%8.2f puts/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass); + time_bias += bm_params.get_documents(); } void -remove_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, int64_t time_bias) +remove_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) { + LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); MyPendingTracker pending_tracker(100); auto &provider = *f._persistence_engine; auto &context = f._context; - uint32_t end = start + count; - for (unsigned int i = start; i < end; ++i) { + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { auto bucket = f.make_bucket(i); auto document_id = f.make_document_id(i); provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker)); @@ -418,24 +469,23 @@ remove_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, } void -run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias) +run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams &bm_params) { - uint32_t tasks = 32; - uint32_t per_task = 5000; - LOG(info, "removeAsync %u small documents, pass=%u", tasks * per_task, pass); + LOG(info, "removeAsync %u small documents, pass=%u", bm_params.get_documents(), pass); auto start_time = std::chrono::steady_clock::now(); - for (uint32_t i = 0; i < tasks; ++i) { - executor.execute(makeLambdaTask([&f, i, per_task, time_bias]() - { remove_async_task(f, i * per_task, per_task, time_bias); })); + for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { + auto range = bm_params.get_range(i); + executor.execute(makeLambdaTask([&f, range, time_bias]() + { remove_async_task(f, range, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - LOG(info, "%8.2f removes/s for pass=%u", (tasks * per_task) / elapsed.count(), pass); - time_bias += tasks * per_task; + LOG(info, "%8.2f removes/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass); + time_bias += bm_params.get_documents(); } -void benchmark_async_spi() +void benchmark_async_spi(const BMParams &bm_params) { vespalib::rmdir(base_dir, true); PersistenceProviderFixture f; @@ -444,21 +494,99 @@ void benchmark_async_spi() provider.initialize(); LOG(info, "create %u buckets", f.num_buckets()); f.create_buckets(); - vespalib::ThreadStackExecutor executor(32, 128 * 1024); + vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); int64_t time_bias = 1; - run_put_async_tasks(f, executor, 0, time_bias); - run_put_async_tasks(f, executor, 1, time_bias); - run_remove_async_tasks(f, executor, 0, time_bias); - run_remove_async_tasks(f, executor, 1, time_bias); + run_put_async_tasks(f, executor, 0, time_bias, bm_params); + run_put_async_tasks(f, executor, 1, time_bias, bm_params); + run_remove_async_tasks(f, executor, 0, time_bias, bm_params); + run_remove_async_tasks(f, executor, 1, time_bias, bm_params); +} + +class App : public FastOS_Application +{ + BMParams _bm_params; +public: + App(); + ~App() override; + void usage(); + bool get_options(); + int Main() override; +}; + +App::App() + : _bm_params() +{ +} + +App::~App() = default; + +void +App::usage() +{ + std::cerr << + "vespa-spi-feed-bm version 0.0\n" + "\n" + "USAGE:\n"; + std::cerr << + "vespa-spi-feed-bm\n" + "[--threads threads]\n" + "[--documents documents]" << std::endl; +} + +bool +App::get_options() +{ + int c; + const char *opt_argument = nullptr; + int long_opt_index = 0; + static struct option long_opts[] = { + { "threads", 1, nullptr, 0 }, + { "documents", 1, nullptr, 0 } + }; + enum longopts_enum { + LONGOPT_THREADS, + LONGOPT_DOCUMENTS + }; + int opt_index = 1; + resetOptIndex(opt_index); + while ((c = GetOptLong("", opt_argument, opt_index, long_opts, &long_opt_index)) != -1) { + switch (c) { + case 0: + switch(long_opt_index) { + case LONGOPT_THREADS: + _bm_params.set_threads(atoi(opt_argument)); + break; + case LONGOPT_DOCUMENTS: + _bm_params.set_documents(atoi(opt_argument)); + break; + default: + return false; + } + break; + default: + return false; + } + } + return _bm_params.check(); +} + +int +App::Main() +{ + if (!get_options()) { + usage(); + return 1; + } + benchmark_async_spi(_bm_params); + return 0; } int main(int argc, char* argv[]) { - (void) argc; - (void) argv; DummyFileHeaderContext::setCreator("vespa-spi-feed-bm"); - benchmark_async_spi(); + App app; + auto exit_value = app.Entry(argc, argv); vespalib::rmdir(base_dir, true); - return 0; + return exit_value; } |