diff options
-rw-r--r-- | searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp | 162 |
1 files changed, 134 insertions, 28 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp index 09a028d0431..ecfa2a07cef 100644 --- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp +++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp @@ -239,7 +239,9 @@ public: class BMParams { uint32_t _documents; uint32_t _threads; + uint32_t _put_passes; uint32_t _update_passes; + uint32_t _remove_passes; uint32_t get_start(uint32_t thread_id) const { return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads); } @@ -247,7 +249,9 @@ public: BMParams() : _documents(160000), _threads(32), - _update_passes(1) + _put_passes(2), + _update_passes(1), + _remove_passes(2) { } BMRange get_range(uint32_t thread_id) const { @@ -255,10 +259,14 @@ public: } uint32_t get_documents() const { return _documents; } uint32_t get_threads() const { return _threads; } + uint32_t get_put_passes() const { return _put_passes; } uint32_t get_update_passes() const { return _update_passes; } + uint32_t get_remove_passes() const { return _remove_passes; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_threads(uint32_t threads_in) { _threads = threads_in; } + void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } + void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } bool check() const; }; @@ -277,6 +285,10 @@ BMParams::check() const std::cerr << "Too few documents: " << _documents << std::endl; return false; } + if (_put_passes < 1) { + std::cerr << "Put passes too low: " << _put_passes << std::endl; + return false; + } return true; } @@ -313,6 +325,7 @@ struct PersistenceProviderFixture { ~PersistenceProviderFixture(); void create_document_db(); uint32_t num_buckets() const { return (1u << _bucket_bits); } + BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); } Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); } DocumentId make_document_id(uint32_t i) const; std::unique_ptr<Document> make_document(uint32_t i) const; @@ -441,30 +454,68 @@ PersistenceProviderFixture::create_buckets() } } +vespalib::nbostream +make_put_feed(PersistenceProviderFixture &f, BMRange range) +{ + vespalib::nbostream serialized_feed; + LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end()); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + serialized_feed << f.make_bucket_id(i); + auto document = f.make_document(i); + document->serialize(serialized_feed); + } + return serialized_feed; +} + +std::vector<vespalib::nbostream> +make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange)> func, const vespalib::string &label) +{ + LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents()); + std::vector<vespalib::nbostream> serialized_feed_v; + auto start_time = std::chrono::steady_clock::now(); + serialized_feed_v.resize(bm_params.get_threads()); + for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { + auto range = bm_params.get_range(i); + executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func]() + { serialized_feed_v[i] = func(range); })); + } + executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + LOG(info, "%8.2f %s data elements/s", bm_params.get_documents() / elapsed.count(), label.c_str()); + return serialized_feed_v; +} + void -put_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) +put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); MyPendingTracker pending_tracker(100); auto &provider = *f._persistence_engine; auto &context = f._context; + auto &repo = *f._repo; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + BucketId bucket_id; + auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto bucket = f.make_bucket(i); - auto document = f.make_document(i); + is >> bucket_id; + Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + auto document = std::make_unique<Document>(repo, is); provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker)); } + assert(is.empty()); pending_tracker.drain(); } void -run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params) +run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams& bm_params) { LOG(info, "putAsync %u small documents, pass=%u", bm_params.get_documents(), pass); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, range, time_bias]() - { put_async_task(f, range, time_bias); })); + executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() + { put_async_task(f, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -473,30 +524,49 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor time_bias += bm_params.get_documents(); } +vespalib::nbostream +make_update_feed(PersistenceProviderFixture &f, BMRange range) +{ + vespalib::nbostream serialized_feed; + LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + serialized_feed << f.make_bucket_id(i); + auto document_update = f.make_document_update(i); + document_update->serializeHEAD(serialized_feed); + } + return serialized_feed; +} + void -update_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) +update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); MyPendingTracker pending_tracker(100); auto &provider = *f._persistence_engine; auto &context = f._context; + auto &repo = *f._repo; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + BucketId bucket_id; + auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto bucket = f.make_bucket(i); - auto document_update = f.make_document_update(i); + is >> bucket_id; + Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + auto document_update = DocumentUpdate::createHEAD(repo, is); provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker)); } + assert(is.empty()); pending_tracker.drain(); } void -run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params) +run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams& bm_params) { LOG(info, "updateAsync %u small documents, pass=%u", bm_params.get_documents(), pass); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, range, time_bias]() - { update_async_task(f, range, time_bias); })); + executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() + { update_async_task(f, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -505,30 +575,49 @@ run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu time_bias += bm_params.get_documents(); } +vespalib::nbostream +make_remove_feed(PersistenceProviderFixture &f, BMRange range) +{ + vespalib::nbostream serialized_feed; + LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + serialized_feed << f.make_bucket_id(i); + auto document_id = f.make_document_id(i); + vespalib::string raw_id = document_id.toString(); + serialized_feed.write(raw_id.c_str(), raw_id.size() + 1); + } + return serialized_feed; +} + void -remove_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) +remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) { LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); MyPendingTracker pending_tracker(100); auto &provider = *f._persistence_engine; auto &context = f._context; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + BucketId bucket_id; + auto bucket_space = f._bucket_space; for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto bucket = f.make_bucket(i); - auto document_id = f.make_document_id(i); + is >> bucket_id; + Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0)); + DocumentId document_id(is); provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker)); } + assert(is.empty()); pending_tracker.drain(); } void -run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams &bm_params) +run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams &bm_params) { LOG(info, "removeAsync %u small documents, pass=%u", bm_params.get_documents(), pass); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, range, time_bias]() - { remove_async_task(f, range, time_bias); })); + executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]() + { remove_async_task(f, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -547,14 +636,19 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "create %u buckets", f.num_buckets()); f.create_buckets(); vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); + auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put"); + auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update"); + auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_remove_feed(f, range); }, "remove"); int64_t time_bias = 1; - run_put_async_tasks(f, executor, 0, time_bias, bm_params); - run_put_async_tasks(f, executor, 1, time_bias, bm_params); + for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) { + run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params); + } for (uint32_t pass = 0; pass < bm_params.get_update_passes(); ++pass) { - run_update_async_tasks(f, executor, pass, time_bias, bm_params); + run_update_async_tasks(f, executor, pass, time_bias, update_feed, bm_params); + } + for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) { + run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params); } - run_remove_async_tasks(f, executor, 0, time_bias, bm_params); - run_remove_async_tasks(f, executor, 1, time_bias, bm_params); } class App : public FastOS_Application @@ -585,8 +679,10 @@ App::usage() std::cerr << "vespa-spi-feed-bm\n" "[--threads threads]\n" - "[--documents documents]" - "[--update-passes update-passes]" << std::endl; + "[--documents documents]\n" + "[--put-passes put-passes]\n" + "[--update-passes update-passes]\n" + "[--remove-passes remove-passes]" << std::endl; } bool @@ -598,12 +694,16 @@ App::get_options() static struct option long_opts[] = { { "threads", 1, nullptr, 0 }, { "documents", 1, nullptr, 0 }, - { "update-passes", 1, nullptr, 0 } + { "put-passes", 1, nullptr, 0 }, + { "update-passes", 1, nullptr, 0 }, + { "remove-passes", 1, nullptr, 0 } }; enum longopts_enum { LONGOPT_THREADS, LONGOPT_DOCUMENTS, - LONGOPT_UPDATE_PASSES + LONGOPT_PUT_PASSES, + LONGOPT_UPDATE_PASSES, + LONGOPT_REMOVE_PASSES }; int opt_index = 1; resetOptIndex(opt_index); @@ -617,9 +717,15 @@ App::get_options() case LONGOPT_DOCUMENTS: _bm_params.set_documents(atoi(opt_argument)); break; + case LONGOPT_PUT_PASSES: + _bm_params.set_put_passes(atoi(opt_argument)); + break; case LONGOPT_UPDATE_PASSES: _bm_params.set_update_passes(atoi(opt_argument)); break; + case LONGOPT_REMOVE_PASSES: + _bm_params.set_remove_passes(atoi(opt_argument)); + break; default: return false; } |