diff options
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp | 68 |
1 files changed, 64 insertions, 4 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp index 6f52b58cb86..b1ced9da793 100644 --- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp +++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp @@ -12,6 +12,8 @@ #include <vespa/document/repo/configbuilder.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/update/assignvalueupdate.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/searchcommon/common/schemaconfigurer.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchcore/proton/matching/querylimiter.h> @@ -50,6 +52,7 @@ using namespace vespa::config::search; using namespace std::chrono_literals; using vespa::config::content::core::BucketspacesConfig; +using document::AssignValueUpdate; using document::BucketId; using document::BucketSpace; using document::Document; @@ -57,7 +60,9 @@ using document::DocumentId; using document::DocumentType; using document::DocumentTypeRepo; using document::DocumenttypesConfig; +using document::DocumentUpdate; using document::Field; +using document::FieldUpdate; using document::IntFieldValue; using document::test::makeBucketSpace; using search::TuneFileDocumentDB; @@ -233,13 +238,15 @@ public: class BMParams { uint32_t _documents; uint32_t _threads; + uint32_t _update_passes; uint32_t get_start(uint32_t thread_id) const { return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads); } public: BMParams() : _documents(160000), - _threads(32) + _threads(32), + _update_passes(1) { } BMRange get_range(uint32_t thread_id) const { @@ -247,8 +254,10 @@ public: } uint32_t get_documents() const { return _documents; } uint32_t get_threads() const { return _threads; } + uint32_t get_update_passes() const { return _update_passes; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_threads(uint32_t threads_in) { _threads = threads_in; } + void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } bool check() const; }; @@ -306,6 +315,7 @@ struct PersistenceProviderFixture { Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); } DocumentId make_document_id(uint32_t i) const; std::unique_ptr<Document> make_document(uint32_t i) const; + std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; void create_buckets(); }; @@ -412,6 +422,15 @@ PersistenceProviderFixture::make_document(uint32_t i) const return document; } +std::unique_ptr<DocumentUpdate> +PersistenceProviderFixture::make_document_update(uint32_t i) const +{ + auto id = make_document_id(i); + auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id); + document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15)))); + return document_update; +} + void PersistenceProviderFixture::create_buckets() { @@ -454,6 +473,38 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor } void +update_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) +{ + LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); + MyPendingTracker pending_tracker(100); + auto &provider = *f._persistence_engine; + auto &context = f._context; + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + auto bucket = f.make_bucket(i); + auto document_update = f.make_document_update(i); + provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker)); + } + pending_tracker.drain(); +} + +void +run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params) +{ + LOG(info, "updateAsync %u small documents, pass=%u", bm_params.get_documents(), pass); + auto start_time = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { + auto range = bm_params.get_range(i); + executor.execute(makeLambdaTask([&f, range, time_bias]() + { update_async_task(f, range, time_bias); })); + } + executor.sync(); + auto end_time = std::chrono::steady_clock::now(); + std::chrono::duration<double> elapsed = end_time - start_time; + LOG(info, "%8.2f updates/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass); + time_bias += bm_params.get_documents(); +} + +void remove_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias) { LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); @@ -498,6 +549,9 @@ void benchmark_async_spi(const BMParams &bm_params) int64_t time_bias = 1; run_put_async_tasks(f, executor, 0, time_bias, bm_params); run_put_async_tasks(f, executor, 1, time_bias, bm_params); + for (uint32_t pass = 0; pass < bm_params.get_update_passes(); ++pass) { + run_update_async_tasks(f, executor, pass, time_bias, bm_params); + } run_remove_async_tasks(f, executor, 0, time_bias, bm_params); run_remove_async_tasks(f, executor, 1, time_bias, bm_params); } @@ -530,7 +584,8 @@ App::usage() std::cerr << "vespa-spi-feed-bm\n" "[--threads threads]\n" - "[--documents documents]" << std::endl; + "[--documents documents]" + "[--update-passes update-passes]" << std::endl; } bool @@ -541,11 +596,13 @@ App::get_options() int long_opt_index = 0; static struct option long_opts[] = { { "threads", 1, nullptr, 0 }, - { "documents", 1, nullptr, 0 } + { "documents", 1, nullptr, 0 }, + { "update-passes", 1, nullptr, 0 } }; enum longopts_enum { LONGOPT_THREADS, - LONGOPT_DOCUMENTS + LONGOPT_DOCUMENTS, + LONGOPT_UPDATE_PASSES }; int opt_index = 1; resetOptIndex(opt_index); @@ -559,6 +616,9 @@ App::get_options() case LONGOPT_DOCUMENTS: _bm_params.set_documents(atoi(opt_argument)); break; + case LONGOPT_UPDATE_PASSES: + _bm_params.set_update_passes(atoi(opt_argument)); + break; default: return false; } |