summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-29 00:28:58 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-29 00:33:39 +0200
commit480b966594e48387818239911949d807aed523d3 (patch)
tree1d4ec5851706304901efe15826604415339cbff8 /searchcore/src
parent243ba9ab96b9ef4be55442641c6b8472994e6e40 (diff)
Avoid overlapping buckets between bm feed threads.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp73
1 files changed, 47 insertions, 26 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index b89bcdbce52..2456cc0ec47 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -195,6 +195,23 @@ struct MyResourceWriteFilter : public IResourceWriteFilter
State getAcceptState() const override { return IResourceWriteFilter::State(); }
};
+class BucketSelector
+{
+ uint32_t _thread_id;
+ uint32_t _threads;
+ uint32_t _num_buckets;
+public:
+ BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in)
+ : _thread_id(thread_id_in),
+ _threads(threads_in),
+ _num_buckets((num_buckets_in / _threads) * _threads)
+ {
+ }
+ uint64_t operator()(uint32_t i) {
+ return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets;
+ }
+};
+
class BMRange
{
uint32_t _start;
@@ -550,11 +567,11 @@ struct PersistenceProviderFixture {
~PersistenceProviderFixture();
void create_document_db();
uint32_t num_buckets() const { return (1u << _bucket_bits); }
- BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); }
- document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); }
- DocumentId make_document_id(uint32_t i) const;
- std::unique_ptr<Document> make_document(uint32_t i) const;
- std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
+ BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); }
+ document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); }
+ DocumentId make_document_id(uint32_t n, uint32_t i) const;
+ std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const;
+ std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const;
void create_buckets();
void start_service_layer(const BMParams& params);
void start_distributor(const BMParams& params);
@@ -673,16 +690,16 @@ PersistenceProviderFixture::create_document_db()
}
DocumentId
-PersistenceProviderFixture::make_document_id(uint32_t i) const
+PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const
{
- DocumentId id(vespalib::make_string("id::test:n=%u:%u", i & (num_buckets() - 1), i));
+ DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i));
return id;
}
std::unique_ptr<Document>
-PersistenceProviderFixture::make_document(uint32_t i) const
+PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const
{
- auto id = make_document_id(i);
+ auto id = make_document_id(n, i);
auto document = std::make_unique<Document>(*_document_type, id);
document->setRepo(*_repo);
document->setFieldValue(_field, std::make_unique<IntFieldValue>(i));
@@ -690,9 +707,9 @@ PersistenceProviderFixture::make_document(uint32_t i) const
}
std::unique_ptr<DocumentUpdate>
-PersistenceProviderFixture::make_document_update(uint32_t i) const
+PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const
{
- auto id = make_document_id(i);
+ auto id = make_document_id(n, i);
auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id);
document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15))));
return document_update;
@@ -790,20 +807,21 @@ PersistenceProviderFixture::shutdown_service_layer()
}
vespalib::nbostream
-make_put_feed(PersistenceProviderFixture &f, BMRange range)
+make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
{
vespalib::nbostream serialized_feed;
LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end());
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- serialized_feed << f.make_bucket_id(i);
- auto document = f.make_document(i);
+ auto n = bucket_selector(i);
+ serialized_feed << f.make_bucket_id(n);
+ auto document = f.make_document(n, i);
document->serialize(serialized_feed);
}
return serialized_feed;
}
std::vector<vespalib::nbostream>
-make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange)> func, const vespalib::string &label)
+make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
{
LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents());
std::vector<vespalib::nbostream> serialized_feed_v;
@@ -811,8 +829,9 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st
serialized_feed_v.resize(bm_params.get_threads());
for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func]()
- { serialized_feed_v[i] = func(range); }));
+ BucketSelector bucket_selector(i, bm_params.get_threads(), num_buckets);
+ executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]()
+ { serialized_feed_v[i] = func(range, bucket_selector); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -860,13 +879,14 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor
}
vespalib::nbostream
-make_update_feed(PersistenceProviderFixture &f, BMRange range)
+make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
{
vespalib::nbostream serialized_feed;
LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- serialized_feed << f.make_bucket_id(i);
- auto document_update = f.make_document_update(i);
+ auto n = bucket_selector(i);
+ serialized_feed << f.make_bucket_id(n);
+ auto document_update = f.make_document_update(n, i);
document_update->serializeHEAD(serialized_feed);
}
return serialized_feed;
@@ -911,13 +931,14 @@ run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu
}
vespalib::nbostream
-make_remove_feed(PersistenceProviderFixture &f, BMRange range)
+make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
{
vespalib::nbostream serialized_feed;
LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- serialized_feed << f.make_bucket_id(i);
- auto document_id = f.make_document_id(i);
+ auto n = bucket_selector(i);
+ serialized_feed << f.make_bucket_id(n);
+ auto document_id = f.make_document_id(n, i);
vespalib::string raw_id = document_id.toString();
serialized_feed.write(raw_id.c_str(), raw_id.size() + 1);
}
@@ -978,9 +999,9 @@ void benchmark_async_spi(const BMParams &bm_params)
}
f.create_feed_handler(bm_params);
vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
- auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put");
- auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update");
- auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_remove_feed(f, range); }, "remove");
+ auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put");
+ auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update");
+ auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove");
int64_t time_bias = 1;
for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) {
run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params);