summaryrefslogtreecommitdiffstats
path: root/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-10-13 16:14:24 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-10-13 17:50:42 +0200
commita1d1b5078c979c936255e5f39757b6b0f36b2d8a (patch)
treebab3b5da82a7f62deb24a5735c608a3164f177ef /searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
parentff2afad2393bf61e4cadf1bde7bd678e92a5b435 (diff)
Benchmark get operations.
Diffstat (limited to 'searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp74
1 files changed, 72 insertions, 2 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 842f24cbd65..506881ec61e 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -26,6 +26,7 @@
#include <vespa/config-upgrading.h>
#include <vespa/config/common/configcontext.h>
#include <vespa/document/datatype/documenttype.h>
+#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/document_type_repo_factory.h>
@@ -121,6 +122,7 @@ using document::DocumentUpdate;
using document::DocumenttypesConfig;
using document::DocumenttypesConfigBuilder;
using document::Field;
+using document::FieldSetRepo;
using document::FieldUpdate;
using document::IntFieldValue;
using document::test::makeBucketSpace;
@@ -260,6 +262,7 @@ public:
class BMParams {
uint32_t _documents;
uint32_t _client_threads;
+ uint32_t _get_passes;
vespalib::string _indexing_sequencer;
uint32_t _put_passes;
uint32_t _update_passes;
@@ -282,6 +285,7 @@ public:
BMParams()
: _documents(160000),
_client_threads(1),
+ _get_passes(0),
_indexing_sequencer(),
_put_passes(2),
_update_passes(1),
@@ -305,6 +309,7 @@ public:
uint32_t get_documents() const { return _documents; }
uint32_t get_max_pending() const { return _max_pending; }
uint32_t get_client_threads() const { return _client_threads; }
+ uint32_t get_get_passes() const { return _get_passes; }
const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
@@ -321,6 +326,7 @@ public:
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
+ void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; }
void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
@@ -641,6 +647,7 @@ struct PersistenceProviderFixture {
MyPersistenceEngineOwner _persistence_owner;
MyResourceWriteFilter _write_filter;
std::shared_ptr<PersistenceEngine> _persistence_engine;
+ std::unique_ptr<const FieldSetRepo> _field_set_repo;
uint32_t _bucket_bits;
MyServiceLayerConfig _service_layer_config;
MyDistributorConfig _distributor_config;
@@ -709,6 +716,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_persistence_owner(),
_write_filter(),
_persistence_engine(),
+ _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)),
_bucket_bits(16),
_service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
_distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
@@ -733,7 +741,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_distributor_config.add_builders(_config_set);
_rpc_client_config.add_builders(_config_set);
_message_bus_config.add_builders(_config_set);
- _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, params.get_skip_get_spi_bucket_info());
+ _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info());
}
PersistenceProviderFixture::~PersistenceProviderFixture()
@@ -825,7 +833,7 @@ PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const
void
PersistenceProviderFixture::create_buckets()
{
- SpiBmFeedHandler feed_handler(*_persistence_engine, false);
+ SpiBmFeedHandler feed_handler(*_persistence_engine, *_field_set_repo, false);
for (unsigned int i = 0; i < num_buckets(); ++i) {
feed_handler.create_bucket(make_bucket(i));
}
@@ -1128,6 +1136,45 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
time_bias += bm_params.get_documents();
}
+void
+get_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed)
+{
+ LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end());
+ feedbm::PendingTracker pending_tracker(max_pending);
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ BucketId bucket_id;
+ vespalib::string all_fields(document::AllFields::NAME);
+ auto bucket_space = f._bucket_space;
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(bucket_space, bucket_id);
+ DocumentId document_id(is);
+ f._feed_handler->get(bucket, all_fields, document_id, pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass,
+ const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
+{
+ uint32_t old_errors = f._feed_handler->get_error_count();
+ auto start_time = std::chrono::steady_clock::now();
+ for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
+ auto range = bm_params.get_range(i);
+ executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
+ { get_async_task(f, max_pending, range, serialized_feed); }));
+ }
+ executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ double throughput = bm_params.get_documents() / elapsed.count();
+ sampler.sample(throughput);
+ LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput);
+}
+
vespalib::nbostream
make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
{
@@ -1214,6 +1261,22 @@ benchmark_async_update(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
}
void
+benchmark_async_get(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor,
+ const std::vector<vespalib::nbostream>& feed, const BMParams& params)
+{
+ if (params.get_get_passes() == 0) {
+ return;
+ }
+ LOG(info, "--------------------------------");
+ LOG(info, "getAsync: %u small documents, passes=%u", params.get_documents(), params.get_get_passes());
+ AvgSampler sampler;
+ for (uint32_t pass = 0; pass < params.get_get_passes(); ++pass) {
+ run_get_async_tasks(f, executor, pass, feed, params, sampler);
+ }
+ LOG(info, "getAsync: AVG gets/s: %8.2f", sampler.avg());
+}
+
+void
benchmark_async_remove(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor,
int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params)
{
@@ -1258,6 +1321,7 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str());
benchmark_async_put(f, executor, time_bias, put_feed, bm_params);
benchmark_async_update(f, executor, time_bias, update_feed, bm_params);
+ benchmark_async_get(f, executor, remove_feed, bm_params);
benchmark_async_remove(f, executor, time_bias, remove_feed, bm_params);
LOG(info, "--------------------------------");
@@ -1295,6 +1359,7 @@ App::usage()
std::cerr <<
"vespa-feed-bm\n"
"[--client-threads threads]\n"
+ "[--get-passes get-passes]\n"
"[--indexing-sequencer [latency,throughput,adaptive]]\n"
"[--max-pending max-pending]\n"
"[--documents documents]\n"
@@ -1324,6 +1389,7 @@ App::get_options()
{ "documents", 1, nullptr, 0 },
{ "enable-distributor", 0, nullptr, 0 },
{ "enable-service-layer", 0, nullptr, 0 },
+ { "get-passes", 1, nullptr, 0 },
{ "indexing-sequencer", 1, nullptr, 0 },
{ "max-pending", 1, nullptr, 0 },
{ "put-passes", 1, nullptr, 0 },
@@ -1343,6 +1409,7 @@ App::get_options()
LONGOPT_DOCUMENTS,
LONGOPT_ENABLE_DISTRIBUTOR,
LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_GET_PASSES,
LONGOPT_INDEXING_SEQUENCER,
LONGOPT_MAX_PENDING,
LONGOPT_PUT_PASSES,
@@ -1375,6 +1442,9 @@ App::get_options()
case LONGOPT_ENABLE_SERVICE_LAYER:
_bm_params.set_enable_service_layer(true);
break;
+ case LONGOPT_GET_PASSES:
+ _bm_params.set_get_passes(atoi(opt_argument));
+ break;
case LONGOPT_INDEXING_SEQUENCER:
_bm_params.set_indexing_sequencer(opt_argument);
break;