diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-29 00:28:58 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-29 00:33:39 +0200 |
commit | 480b966594e48387818239911949d807aed523d3 (patch) | |
tree | 1d4ec5851706304901efe15826604415339cbff8 /searchcore/src | |
parent | 243ba9ab96b9ef4be55442641c6b8472994e6e40 (diff) |
Avoid overlapping buckets between bm feed threads.
Diffstat (limited to 'searchcore/src')
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | 73 |
1 files changed, 47 insertions, 26 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index b89bcdbce52..2456cc0ec47 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -195,6 +195,23 @@ struct MyResourceWriteFilter : public IResourceWriteFilter State getAcceptState() const override { return IResourceWriteFilter::State(); } }; +class BucketSelector +{ + uint32_t _thread_id; + uint32_t _threads; + uint32_t _num_buckets; +public: + BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in) + : _thread_id(thread_id_in), + _threads(threads_in), + _num_buckets((num_buckets_in / _threads) * _threads) + { + } + uint64_t operator()(uint32_t i) { + return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; + } +}; + class BMRange { uint32_t _start; @@ -550,11 +567,11 @@ struct PersistenceProviderFixture { ~PersistenceProviderFixture(); void create_document_db(); uint32_t num_buckets() const { return (1u << _bucket_bits); } - BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); } - document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); } - DocumentId make_document_id(uint32_t i) const; - std::unique_ptr<Document> make_document(uint32_t i) const; - std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; + BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); } + document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); } + DocumentId make_document_id(uint32_t n, uint32_t i) const; + std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const; + std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const; void create_buckets(); void start_service_layer(const BMParams& params); void start_distributor(const BMParams& params); @@ -673,16 +690,16 @@ PersistenceProviderFixture::create_document_db() } DocumentId -PersistenceProviderFixture::make_document_id(uint32_t i) const +PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const { - DocumentId id(vespalib::make_string("id::test:n=%u:%u", i & (num_buckets() - 1), i)); + DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i)); return id; } std::unique_ptr<Document> -PersistenceProviderFixture::make_document(uint32_t i) const +PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const { - auto id = make_document_id(i); + auto id = make_document_id(n, i); auto document = std::make_unique<Document>(*_document_type, id); document->setRepo(*_repo); document->setFieldValue(_field, std::make_unique<IntFieldValue>(i)); @@ -690,9 +707,9 @@ PersistenceProviderFixture::make_document(uint32_t i) const } std::unique_ptr<DocumentUpdate> -PersistenceProviderFixture::make_document_update(uint32_t i) const +PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const { - auto id = make_document_id(i); + auto id = make_document_id(n, i); auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id); document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15)))); return document_update; @@ -790,20 +807,21 @@ PersistenceProviderFixture::shutdown_service_layer() } vespalib::nbostream -make_put_feed(PersistenceProviderFixture &f, BMRange range) +make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { vespalib::nbostream serialized_feed; LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end()); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - serialized_feed << f.make_bucket_id(i); - auto document = f.make_document(i); + auto n = bucket_selector(i); + serialized_feed << f.make_bucket_id(n); + auto document = f.make_document(n, i); document->serialize(serialized_feed); } return serialized_feed; } std::vector<vespalib::nbostream> -make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange)> func, const vespalib::string &label) +make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) { LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents()); std::vector<vespalib::nbostream> serialized_feed_v; @@ -811,8 +829,9 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st serialized_feed_v.resize(bm_params.get_threads()); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func]() - { serialized_feed_v[i] = func(range); })); + BucketSelector bucket_selector(i, bm_params.get_threads(), num_buckets); + executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]() + { serialized_feed_v[i] = func(range, bucket_selector); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -860,13 +879,14 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor } vespalib::nbostream -make_update_feed(PersistenceProviderFixture &f, BMRange range) +make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { vespalib::nbostream serialized_feed; LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - serialized_feed << f.make_bucket_id(i); - auto document_update = f.make_document_update(i); + auto n = bucket_selector(i); + serialized_feed << f.make_bucket_id(n); + auto document_update = f.make_document_update(n, i); document_update->serializeHEAD(serialized_feed); } return serialized_feed; @@ -911,13 +931,14 @@ run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu } vespalib::nbostream -make_remove_feed(PersistenceProviderFixture &f, BMRange range) +make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { vespalib::nbostream serialized_feed; LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - serialized_feed << f.make_bucket_id(i); - auto document_id = f.make_document_id(i); + auto n = bucket_selector(i); + serialized_feed << f.make_bucket_id(n); + auto document_id = f.make_document_id(n, i); vespalib::string raw_id = document_id.toString(); serialized_feed.write(raw_id.c_str(), raw_id.size() + 1); } @@ -978,9 +999,9 @@ void benchmark_async_spi(const BMParams &bm_params) } f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); - auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put"); - auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update"); - auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_remove_feed(f, range); }, "remove"); + auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put"); + auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update"); + auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove"); int64_t time_bias = 1; for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) { run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params); |