summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-18 16:47:47 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-18 16:47:47 +0200
commit41c33fe66dc7133403670fa95a6c8088eb5b6f8f (patch)
tree09b690df31087520d6afd182ae99bd4d134fe5e6
parent64e81a94ed768bc43b1769d272b5fb833edc162d (diff)
Benchmark update operations.
-rw-r--r--searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp68
1 files changed, 64 insertions, 4 deletions
diff --git a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
index 6f52b58cb86..b1ced9da793 100644
--- a/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-spi-feed-bm/vespa_spi_feed_bm.cpp
@@ -12,6 +12,8 @@
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/searchcommon/common/schemaconfigurer.h>
#include <vespa/searchcore/proton/common/hw_info.h>
#include <vespa/searchcore/proton/matching/querylimiter.h>
@@ -50,6 +52,7 @@ using namespace vespa::config::search;
using namespace std::chrono_literals;
using vespa::config::content::core::BucketspacesConfig;
+using document::AssignValueUpdate;
using document::BucketId;
using document::BucketSpace;
using document::Document;
@@ -57,7 +60,9 @@ using document::DocumentId;
using document::DocumentType;
using document::DocumentTypeRepo;
using document::DocumenttypesConfig;
+using document::DocumentUpdate;
using document::Field;
+using document::FieldUpdate;
using document::IntFieldValue;
using document::test::makeBucketSpace;
using search::TuneFileDocumentDB;
@@ -233,13 +238,15 @@ public:
class BMParams {
uint32_t _documents;
uint32_t _threads;
+ uint32_t _update_passes;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _threads) * thread_id + std::min(thread_id, _documents % _threads);
}
public:
BMParams()
: _documents(160000),
- _threads(32)
+ _threads(32),
+ _update_passes(1)
{
}
BMRange get_range(uint32_t thread_id) const {
@@ -247,8 +254,10 @@ public:
}
uint32_t get_documents() const { return _documents; }
uint32_t get_threads() const { return _threads; }
+ uint32_t get_update_passes() const { return _update_passes; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_threads(uint32_t threads_in) { _threads = threads_in; }
+ void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
bool check() const;
};
@@ -306,6 +315,7 @@ struct PersistenceProviderFixture {
Bucket make_bucket(uint32_t i) const { return Bucket(document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))), PartitionId(0)); }
DocumentId make_document_id(uint32_t i) const;
std::unique_ptr<Document> make_document(uint32_t i) const;
+ std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
void create_buckets();
};
@@ -412,6 +422,15 @@ PersistenceProviderFixture::make_document(uint32_t i) const
return document;
}
+std::unique_ptr<DocumentUpdate>
+PersistenceProviderFixture::make_document_update(uint32_t i) const
+{
+ auto id = make_document_id(i);
+ auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id);
+ document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15))));
+ return document_update;
+}
+
void
PersistenceProviderFixture::create_buckets()
{
@@ -454,6 +473,38 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor
}
void
+update_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
+{
+ LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
+ MyPendingTracker pending_tracker(100);
+ auto &provider = *f._persistence_engine;
+ auto &context = f._context;
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ auto bucket = f.make_bucket(i);
+ auto document_update = f.make_document_update(i);
+ provider.updateAsync(bucket, Timestamp(time_bias + i), std::move(document_update), context, std::make_unique<MyOperationComplete>(pending_tracker));
+ }
+ pending_tracker.drain();
+}
+
+void
+run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const BMParams& bm_params)
+{
+ LOG(info, "updateAsync %u small documents, pass=%u", bm_params.get_documents(), pass);
+ auto start_time = std::chrono::steady_clock::now();
+ for (uint32_t i = 0; i < bm_params.get_threads(); ++i) {
+ auto range = bm_params.get_range(i);
+ executor.execute(makeLambdaTask([&f, range, time_bias]()
+ { update_async_task(f, range, time_bias); }));
+ }
+ executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ LOG(info, "%8.2f updates/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass);
+ time_bias += bm_params.get_documents();
+}
+
+void
remove_async_task(PersistenceProviderFixture &f, BMRange range, int64_t time_bias)
{
LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
@@ -498,6 +549,9 @@ void benchmark_async_spi(const BMParams &bm_params)
int64_t time_bias = 1;
run_put_async_tasks(f, executor, 0, time_bias, bm_params);
run_put_async_tasks(f, executor, 1, time_bias, bm_params);
+ for (uint32_t pass = 0; pass < bm_params.get_update_passes(); ++pass) {
+ run_update_async_tasks(f, executor, pass, time_bias, bm_params);
+ }
run_remove_async_tasks(f, executor, 0, time_bias, bm_params);
run_remove_async_tasks(f, executor, 1, time_bias, bm_params);
}
@@ -530,7 +584,8 @@ App::usage()
std::cerr <<
"vespa-spi-feed-bm\n"
"[--threads threads]\n"
- "[--documents documents]" << std::endl;
+ "[--documents documents]"
+ "[--update-passes update-passes]" << std::endl;
}
bool
@@ -541,11 +596,13 @@ App::get_options()
int long_opt_index = 0;
static struct option long_opts[] = {
{ "threads", 1, nullptr, 0 },
- { "documents", 1, nullptr, 0 }
+ { "documents", 1, nullptr, 0 },
+ { "update-passes", 1, nullptr, 0 }
};
enum longopts_enum {
LONGOPT_THREADS,
- LONGOPT_DOCUMENTS
+ LONGOPT_DOCUMENTS,
+ LONGOPT_UPDATE_PASSES
};
int opt_index = 1;
resetOptIndex(opt_index);
@@ -559,6 +616,9 @@ App::get_options()
case LONGOPT_DOCUMENTS:
_bm_params.set_documents(atoi(opt_argument));
break;
+ case LONGOPT_UPDATE_PASSES:
+ _bm_params.set_update_passes(atoi(opt_argument));
+ break;
default:
return false;
}