summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-19 20:25:36 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-21 10:17:33 +0200
commit1384138a797855facdd867da8eb9d118608db5c2 (patch)
tree468ff8641f6d65d80a28f134b40435cd2a73aed3 /searchcore
parent4b9e902232ba4f2e9c84eaff0dd9da4a9e34dc57 (diff)
Use serialized feed operations.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp162
1 files changed, 134 insertions, 28 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
index 09a028d0431..ecfa2a07cef 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
@@ -239,7 +239,9 @@ public:
class BMParams {
uint32_t _documents;
uint32_t _threads;
+ uint32_t _put_passes;
uint32_t _update_passes;
+ uint32_t _remove_passes;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads);
}
@@ -247,7 +249,9 @@ public:
BMParams()
: _documents(160000),
_threads(32),
- _update_passes(1)
+ _put_passes(2),
+ _update_passes(1),
+ _remove_passes(2)
{
}
BMRange get_range(uint32_t thread_id) const {
@@ -255,10 +259,14 @@ public:
}
uint32_t get_documents() const { return _documents; }
uint32_t get_threads() const { return _threads; }
+ uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
+ uint32_t get_remove_passes() const { return _remove_passes; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_threads(uint32_t threads_in) { _threads = threads_in; }
+ void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
+ void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
bool check() const;
};
@@ -277,6 +285,10 @@ BMParams::check() const
std::cerr << "Too few documents: " << _documents << std::endl;
return false;
}
+ if (_put_passes < 1) {
+ std::cerr << "Put passes too low: " << _put_passes << std::endl;
+ return false;
+ }
return true;
}
@@ -313,6 +325,7 @@ struct PersistenceProviderFixture {
~PersistenceProviderFixture();
void create_document_db();
uint32_t num_buckets() const { return (1u << _bucket_bits); }
+ BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); }
Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); }
DocumentId make_document_id(uint32_t i) const;
std::unique_ptr<Document> make_document(uint32_t i) const;
@@ -441,30 +454,68 @@ PersistenceProviderFixture::create_buckets()
}
}
+vespalib::nbostream
+make_put_feed(PersistenceProviderFixture &f, BMRange range)
+{
+ vespalib::nbostream serialized_feed;
+ LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end());
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ serialized_feed << f.make_bucket_id(i);
+ auto document = f.make_document(i);
+ document->serialize(serialized_feed);
+ }
+ return serialized_feed;
+}
+
+std::vector<vespalib::nbostream>
+make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange)> func, const vespalib::string &label)
+{
+ LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents());
+ std::vector<vespalib::nbostream> serialized_feed_v;
+ auto start_time = std::chrono::steady_clock::now();
+ serialized_feed_v.resize(bm_params.get_threads());
+ for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
+ auto range = bm_params.get_range(i);
+ executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func]()
+ { serialized_feed_v[i] = func(range); }));
+ }
+ executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ LOG(info, "%8.2f %s data elements/s", bm_params.get_documents() / elapsed.count(), label.c_str());
+ return serialized_feed_v;
+}
+
void
-put_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
+put_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
MyPendingTracker pending_tracker(100);
auto &provider = *f._persistence_engine;
auto &context = f._context;
+ auto &repo = *f._repo;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ BucketId bucket_id;
+ auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto bucket = f.make_bucket(i);
- auto document = f.make_document(i);
+ is >> bucket_id;
+ Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ auto document = std::make_unique<Document>(repo, is);
provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker));
}
+ assert(is.empty());
pending_tracker.drain();
}
void
-run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params)
+run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams& bm_params)
{
LOG(info, "putAsync %u small documents, pass=%u", bm_params.get_documents(), pass);
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, range, time_bias]()
- { put_async_task(f, range, time_bias); }));
+ executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { put_async_task(f, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -473,30 +524,49 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor
time_bias += bm_params.get_documents();
}
+vespalib::nbostream
+make_update_feed(PersistenceProviderFixture &f, BMRange range)
+{
+ vespalib::nbostream serialized_feed;
+ LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ serialized_feed << f.make_bucket_id(i);
+ auto document_update = f.make_document_update(i);
+ document_update->serializeHEAD(serialized_feed);
+ }
+ return serialized_feed;
+}
+
void
-update_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
+update_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
MyPendingTracker pending_tracker(100);
auto &provider = *f._persistence_engine;
auto &context = f._context;
+ auto &repo = *f._repo;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ BucketId bucket_id;
+ auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto bucket = f.make_bucket(i);
- auto document_update = f.make_document_update(i);
+ is >> bucket_id;
+ Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ auto document_update = DocumentUpdate::createHEAD(repo, is);
provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker));
}
+ assert(is.empty());
pending_tracker.drain();
}
void
-run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params)
+run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams& bm_params)
{
LOG(info, "updateAsync %u small documents, pass=%u", bm_params.get_documents(), pass);
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, range, time_bias]()
- { update_async_task(f, range, time_bias); }));
+ executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { update_async_task(f, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -505,30 +575,49 @@ run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu
time_bias += bm_params.get_documents();
}
+vespalib::nbostream
+make_remove_feed(PersistenceProviderFixture &f, BMRange range)
+{
+ vespalib::nbostream serialized_feed;
+ LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ serialized_feed << f.make_bucket_id(i);
+ auto document_id = f.make_document_id(i);
+ vespalib::string raw_id = document_id.toString();
+ serialized_feed.write(raw_id.c_str(), raw_id.size() + 1);
+ }
+ return serialized_feed;
+}
+
void
-remove_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
+remove_async_task(PersistenceProviderFixture &f, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
MyPendingTracker pending_tracker(100);
auto &provider = *f._persistence_engine;
auto &context = f._context;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ BucketId bucket_id;
+ auto bucket_space = f._bucket_space;
for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto bucket = f.make_bucket(i);
- auto document_id = f.make_document_id(i);
+ is >> bucket_id;
+ Bucket bucket(document::Bucket(bucket_space, bucket_id), PartitionId(0));
+ DocumentId document_id(is);
provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker));
}
+ assert(is.empty());
pending_tracker.drain();
}
void
-run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams &bm_params)
+run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams &bm_params)
{
LOG(info, "removeAsync %u small documents, pass=%u", bm_params.get_documents(), pass);
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, range, time_bias]()
- { remove_async_task(f, range, time_bias); }));
+ executor.execute(makeLambdaTask([&f, &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { remove_async_task(f, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
@@ -547,14 +636,19 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "create %u buckets", f.num_buckets());
f.create_buckets();
vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
+ auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put");
+ auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update");
+ auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_remove_feed(f, range); }, "remove");
int64_t time_bias = 1;
- run_put_async_tasks(f, executor, 0, time_bias, bm_params);
- run_put_async_tasks(f, executor, 1, time_bias, bm_params);
+ for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) {
+ run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params);
+ }
for (uint32_t pass = 0; pass < bm_params.get_update_passes(); ++pass) {
- run_update_async_tasks(f, executor, pass, time_bias, bm_params);
+ run_update_async_tasks(f, executor, pass, time_bias, update_feed, bm_params);
+ }
+ for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) {
+ run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params);
}
- run_remove_async_tasks(f, executor, 0, time_bias, bm_params);
- run_remove_async_tasks(f, executor, 1, time_bias, bm_params);
}
class App : public FastOS_Application
@@ -585,8 +679,10 @@ App::usage()
std::cerr <<
"vespa-spi-feed-bm\n"
"[--threads threads]\n"
- "[--documents documents]"
- "[--update-passes update-passes]" << std::endl;
+ "[--documents documents]\n"
+ "[--put-passes put-passes]\n"
+ "[--update-passes update-passes]\n"
+ "[--remove-passes remove-passes]" << std::endl;
}
bool
@@ -598,12 +694,16 @@ App::get_options()
static struct option long_opts[] = {
{ "threads", 1, nullptr, 0 },
{ "documents", 1, nullptr, 0 },
- { "update-passes", 1, nullptr, 0 }
+ { "put-passes", 1, nullptr, 0 },
+ { "update-passes", 1, nullptr, 0 },
+ { "remove-passes", 1, nullptr, 0 }
};
enum longopts_enum {
LONGOPT_THREADS,
LONGOPT_DOCUMENTS,
- LONGOPT_UPDATE_PASSES
+ LONGOPT_PUT_PASSES,
+ LONGOPT_UPDATE_PASSES,
+ LONGOPT_REMOVE_PASSES
};
int opt_index = 1;
resetOptIndex(opt_index);
@@ -617,9 +717,15 @@ App::get_options()
case LONGOPT_DOCUMENTS:
_bm_params.set_documents(atoi(opt_argument));
break;
+ case LONGOPT_PUT_PASSES:
+ _bm_params.set_put_passes(atoi(opt_argument));
+ break;
case LONGOPT_UPDATE_PASSES:
_bm_params.set_update_passes(atoi(opt_argument));
break;
+ case LONGOPT_REMOVE_PASSES:
+ _bm_params.set_remove_passes(atoi(opt_argument));
+ break;
default:
return false;
}