summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-18 14:27:03 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-18 14:27:03 +0200
commit63291d89df02bb2c6a0ff1ec95124b7d746410a1 (patch)
tree2efd013bcc5392d97a53f6ecb7c39c5a45eecba8
parent6411525f438f153946af1cbabc0f1a33178c9bcf (diff)
Add options to vespa spi feed benchmark.
-rw-r--r--searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp208
1 files changed, 168 insertions, 40 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
index d2c8e00aa81..6f52b58cb86 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
@@ -34,6 +34,9 @@
#include <vespa/config-indexschema.h>
#include <vespa/config-summary.h>
#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/fastos/app.h>
+#include <getopt.h>
+#include <iostream>
#include <vespa/log/log.h>
LOG_SETUP("vespa-spi-feed-bm");
@@ -213,6 +216,60 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result
(void) resultHandler;
}
+class BMRange
+{
+ uint32_t _start;
+ uint32_t _end;
+public:
+ BMRange(uint32_t start_in, uint32_t end_in)
+ : _start(start_in),
+ _end(end_in)
+ {
+ }
+ uint32_t get_start() const { return _start; }
+ uint32_t get_end() const { return _end; }
+};
+
+class BMParams {
+ uint32_t _documents;
+ uint32_t _threads;
+ uint32_t get_start(uint32_t thread_id) const {
+ return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads);
+ }
+public:
+ BMParams()
+ : _documents(160000),
+ _threads(32)
+ {
+ }
+ BMRange get_range(uint32_t thread_id) const {
+ return BMRange(get_start(thread_id), get_start(thread_id + 1));
+ }
+ uint32_t get_documents() const { return _documents; }
+ uint32_t get_threads() const { return _threads; }
+ void set_documents(uint32_t documents_in) { _documents = documents_in; }
+ void set_threads(uint32_t threads_in) { _threads = threads_in; }
+ bool check() const;
+};
+
+bool
+BMParams::check() const
+{
+ if (_threads < 1) {
+ std::cerr << "Too few threads: " << _threads << std::endl;
+ return false;
+ }
+ if (_threads > 256) {
+ std::cerr << "Too many threads: " << _threads << std::endl;
+ return false;
+ }
+ if (_documents < _threads) {
+ std::cerr << "Too few documents: " << _documents << std::endl;
+ return false;
+ }
+ return true;
+}
+
}
@@ -241,17 +298,12 @@ struct PersistenceProviderFixture {
std::shared_ptr<PersistenceEngine> _persistence_engine;
storage::spi::Context _context;
uint32_t _bucket_bits;
-
+
PersistenceProviderFixture();
~PersistenceProviderFixture();
void create_document_db();
uint32_t num_buckets() const { return (1u << _bucket_bits); }
Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); }
- BucketId get_bucket_id(const Document &document) const {
- auto &id = document.getId();
- auto &gid = id.getGlobalId();
- return BucketId(_bucket_bits, gid.convertToBucketId().getRawId()).stripUnused();
- }
DocumentId make_document_id(uint32_t i) const;
std::unique_ptr<Document> make_document(uint32_t i) const;
void create_buckets();
@@ -370,13 +422,13 @@ PersistenceProviderFixture::create_buckets()
}
void
-put_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, int64_t time_bias)
+put_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
{
+ LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
MyPendingTracker pending_tracker(100);
auto &provider = *f._persistence_engine;
auto &context = f._context;
- uint32_t end = start + count;
- for (unsigned int i = start; i < end; ++i) {
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
auto bucket = f.make_bucket(i);
auto document = f.make_document(i);
provider.putAsync(bucket, Timestamp(time_bias + i), std::move(document), context, std::make_unique<MyOperationComplete>(pending_tracker));
@@ -385,31 +437,30 @@ put_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, in
}
void
-run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias)
+run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params)
{
- uint32_t tasks = 32;
- uint32_t per_task = 5000;
- LOG(info, "putAsync %u small documents, pass=%u", tasks * per_task, pass);
+ LOG(info, "putAsync %u small documents, pass=%u", bm_params.get_documents(), pass);
auto start_time = std::chrono::steady_clock::now();
- for (uint32_t i = 0; i < tasks; ++i) {
- executor.execute(makeLambdaTask([&f, i, per_task, time_bias]()
- { put_async_task(f, i * per_task, per_task, time_bias); }));
+ for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
+ auto range = bm_params.get_range(i);
+ executor.execute(makeLambdaTask([&f, range, time_bias]()
+ { put_async_task(f, range, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- LOG(info, "%8.2f puts/s for pass=%u", (tasks * per_task) / elapsed.count(), pass);
- time_bias += tasks * per_task;
+ LOG(info, "%8.2f puts/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass);
+ time_bias += bm_params.get_documents();
}
void
-remove_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count, int64_t time_bias)
+remove_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
{
+ LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
MyPendingTracker pending_tracker(100);
auto &provider = *f._persistence_engine;
auto &context = f._context;
- uint32_t end = start + count;
- for (unsigned int i = start; i < end; ++i) {
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
auto bucket = f.make_bucket(i);
auto document_id = f.make_document_id(i);
provider.removeAsync(bucket, Timestamp(time_bias + i), document_id, context, std::make_unique<MyOperationComplete>(pending_tracker));
@@ -418,24 +469,23 @@ remove_async_task(PersistenceProviderFixture &f, uint32_t start, uint32_t count,
}
void
-run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias)
+run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams &bm_params)
{
- uint32_t tasks = 32;
- uint32_t per_task = 5000;
- LOG(info, "removeAsync %u small documents, pass=%u", tasks * per_task, pass);
+ LOG(info, "removeAsync %u small documents, pass=%u", bm_params.get_documents(), pass);
auto start_time = std::chrono::steady_clock::now();
- for (uint32_t i = 0; i < tasks; ++i) {
- executor.execute(makeLambdaTask([&f, i, per_task, time_bias]()
- { remove_async_task(f, i * per_task, per_task, time_bias); }));
+ for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
+ auto range = bm_params.get_range(i);
+ executor.execute(makeLambdaTask([&f, range, time_bias]()
+ { remove_async_task(f, range, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- LOG(info, "%8.2f removes/s for pass=%u", (tasks * per_task) / elapsed.count(), pass);
- time_bias += tasks * per_task;
+ LOG(info, "%8.2f removes/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass);
+ time_bias += bm_params.get_documents();
}
-void benchmark_async_spi()
+void benchmark_async_spi(const BMParams &bm_params)
{
vespalib::rmdir(base_dir, true);
PersistenceProviderFixture f;
@@ -444,21 +494,99 @@ void benchmark_async_spi()
provider.initialize();
LOG(info, "create %u buckets", f.num_buckets());
f.create_buckets();
- vespalib::ThreadStackExecutor executor(32, 128 * 1024);
+ vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
int64_t time_bias = 1;
- run_put_async_tasks(f, executor, 0, time_bias);
- run_put_async_tasks(f, executor, 1, time_bias);
- run_remove_async_tasks(f, executor, 0, time_bias);
- run_remove_async_tasks(f, executor, 1, time_bias);
+ run_put_async_tasks(f, executor, 0, time_bias, bm_params);
+ run_put_async_tasks(f, executor, 1, time_bias, bm_params);
+ run_remove_async_tasks(f, executor, 0, time_bias, bm_params);
+ run_remove_async_tasks(f, executor, 1, time_bias, bm_params);
+}
+
+class App : public FastOS_Application
+{
+ BMParams _bm_params;
+public:
+ App();
+ ~App() override;
+ void usage();
+ bool get_options();
+ int Main() override;
+};
+
+App::App()
+ : _bm_params()
+{
+}
+
+App::~App() = default;
+
+void
+App::usage()
+{
+ std::cerr <<
+ "vespa-spi-feed-bm version 0.0\n"
+ "\n"
+ "USAGE:\n";
+ std::cerr <<
+ "vespa-spi-feed-bm\n"
+ "[--threads threads]\n"
+ "[--documents documents]" << std::endl;
+}
+
+bool
+App::get_options()
+{
+ int c;
+ const char *opt_argument = nullptr;
+ int long_opt_index = 0;
+ static struct option long_opts[] = {
+ { "threads", 1, nullptr, 0 },
+ { "documents", 1, nullptr, 0 }
+ };
+ enum longopts_enum {
+ LONGOPT_THREADS,
+ LONGOPT_DOCUMENTS
+ };
+ int opt_index = 1;
+ resetOptIndex(opt_index);
+ while ((c = GetOptLong("", opt_argument, opt_index, long_opts, &long_opt_index)) != -1) {
+ switch (c) {
+ case 0:
+ switch(long_opt_index) {
+ case LONGOPT_THREADS:
+ _bm_params.set_threads(atoi(opt_argument));
+ break;
+ case LONGOPT_DOCUMENTS:
+ _bm_params.set_documents(atoi(opt_argument));
+ break;
+ default:
+ return false;
+ }
+ break;
+ default:
+ return false;
+ }
+ }
+ return _bm_params.check();
+}
+
+int
+App::Main()
+{
+ if (!get_options()) {
+ usage();
+ return 1;
+ }
+ benchmark_async_spi(_bm_params);
+ return 0;
}
int
main(int argc, char* argv[])
{
- (void) argc;
- (void) argv;
DummyFileHeaderContext::setCreator("vespa-spi-feed-bm");
- benchmark_async_spi();
+ App app;
+ auto exit_value = app.Entry(argc, argv);
vespalib::rmdir(base_dir, true);
- return 0;
+ return exit_value;
}