aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-15 16:44:26 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-15 16:44:26 +0200
commitff46fe04c98a52962a57e5484dacd4901e20044d (patch)
treee01ebc8f223023713a8a82ae6e6ed970da384345 /searchcore
parented29df6d44c73acd7efe5368b0e5e4c7a0720b80 (diff)
Move portions of BMParams to BmFeedParams.
Move more feed functions to BmFeeder and BmFeed.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp215
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h26
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp95
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feed.h17
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp42
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h37
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp199
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h50
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp1
10 files changed, 420 insertions, 264 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 8ed74d4e92f..451e9fdefbb 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -4,10 +4,13 @@
#include <vespa/document/repo/document_type_repo_factory.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/fastos/app.h>
+#include <vespa/searchcore/bmcluster/avg_sampler.h>
#include <vespa/searchcore/bmcluster/bm_cluster.h>
#include <vespa/searchcore/bmcluster/bm_cluster_controller.h>
#include <vespa/searchcore/bmcluster/bm_cluster_params.h>
#include <vespa/searchcore/bmcluster/bm_feed.h>
+#include <vespa/searchcore/bmcluster/bm_feeder.h>
+#include <vespa/searchcore/bmcluster/bm_feed_params.h>
#include <vespa/searchcore/bmcluster/bm_node.h>
#include <vespa/searchcore/bmcluster/bm_range.h>
#include <vespa/searchcore/bmcluster/bucket_selector.h>
@@ -33,11 +36,14 @@ using document::DocumentTypeRepo;
using document::DocumentTypeRepoFactory;
using document::DocumenttypesConfig;
using document::DocumenttypesConfigBuilder;
+using search::bmcluster::AvgSampler;
using search::bmcluster::BmClusterController;
using search::bmcluster::IBmFeedHandler;
using search::bmcluster::BmClusterParams;
using search::bmcluster::BmCluster;
using search::bmcluster::BmFeed;
+using search::bmcluster::BmFeedParams;
+using search::bmcluster::BmFeeder;
using search::bmcluster::BmNode;
using search::bmcluster::BmRange;
using search::bmcluster::BucketSelector;
@@ -58,41 +64,27 @@ std::shared_ptr<DocumenttypesConfig> make_document_types() {
return std::make_shared<DocumenttypesConfig>(builder.config());
}
-class BMParams : public BmClusterParams {
- uint32_t _documents;
- uint32_t _client_threads;
+class BMParams : public BmClusterParams,
+ public BmFeedParams
+{
uint32_t _get_passes;
uint32_t _put_passes;
uint32_t _update_passes;
uint32_t _remove_passes;
- uint32_t _max_pending;
- uint32_t get_start(uint32_t thread_id) const {
- return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads);
- }
public:
BMParams()
- : _documents(160000),
- _client_threads(1),
+ : BmClusterParams(),
+ BmFeedParams(),
_get_passes(0),
_put_passes(2),
_update_passes(1),
- _remove_passes(2),
- _max_pending(1000)
+ _remove_passes(2)
{
}
- BmRange get_range(uint32_t thread_id) const {
- return BmRange(get_start(thread_id), get_start(thread_id + 1));
- }
- uint32_t get_documents() const { return _documents; }
- uint32_t get_max_pending() const { return _max_pending; }
- uint32_t get_client_threads() const { return _client_threads; }
uint32_t get_get_passes() const { return _get_passes; }
uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
- void set_documents(uint32_t documents_in) { _documents = documents_in; }
- void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
- void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
@@ -106,16 +98,7 @@ BMParams::check() const
if (!BmClusterParams::check()) {
return false;
}
- if (_client_threads < 1) {
- std::cerr << "Too few client threads: " << _client_threads << std::endl;
- return false;
- }
- if (_client_threads > 256) {
- std::cerr << "Too many client threads: " << _client_threads << std::endl;
- return false;
- }
- if (_documents < _client_threads) {
- std::cerr << "Too few documents: " << _documents << std::endl;
+ if (!BmFeedParams::check()) {
return false;
}
if (_put_passes < 1) {
@@ -133,7 +116,6 @@ struct PersistenceProviderFixture {
std::shared_ptr<const DocumentTypeRepo> _repo;
std::unique_ptr<BmCluster> _bm_cluster;
BmFeed _feed;
- IBmFeedHandler* _feed_handler;
explicit PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
@@ -143,155 +125,27 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
: _document_types(make_document_types()),
_repo(document::DocumentTypeRepoFactory::make(*_document_types)),
_bm_cluster(std::make_unique<BmCluster>(base_dir, base_port, params, _document_types, _repo)),
- _feed(_repo),
- _feed_handler(nullptr)
+ _feed(_repo)
{
_bm_cluster->make_nodes();
}
PersistenceProviderFixture::~PersistenceProviderFixture() = default;
-std::vector<vespalib::nbostream>
-make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
-{
- LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents());
- std::vector<vespalib::nbostream> serialized_feed_v;
- auto start_time = std::chrono::steady_clock::now();
- serialized_feed_v.resize(bm_params.get_client_threads());
- for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
- auto range = bm_params.get_range(i);
- BucketSelector bucket_selector(i, bm_params.get_client_threads(), num_buckets);
- executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]()
- { serialized_feed_v[i] = func(range, bucket_selector); }));
- }
- executor.sync();
- auto end_time = std::chrono::steady_clock::now();
- std::chrono::duration<double> elapsed = end_time - start_time;
- LOG(info, "%8.2f %s data elements/s", bm_params.get_documents() / elapsed.count(), label.c_str());
- return serialized_feed_v;
-}
-
-class AvgSampler {
-private:
- double _total;
- size_t _samples;
-
-public:
- AvgSampler() : _total(0), _samples(0) {}
- void sample(double val) {
- _total += val;
- ++_samples;
- }
- double avg() const { return _total / (double)_samples; }
-};
-
-void
-run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
- const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
-{
- auto& feed = f._feed;
- auto& feed_handler = *f._feed_handler;
- uint32_t old_errors = feed_handler.get_error_count();
- auto start_time = std::chrono::steady_clock::now();
- for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
- auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { feed.put_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
- }
- executor.sync();
- auto end_time = std::chrono::steady_clock::now();
- std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = feed_handler.get_error_count() - old_errors;
- double throughput = bm_params.get_documents() / elapsed.count();
- sampler.sample(throughput);
- LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput);
- time_bias += bm_params.get_documents();
-}
-
-void
-run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
- const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
-{
- auto& feed = f._feed;
- auto& feed_handler = *f._feed_handler;
- uint32_t old_errors = feed_handler.get_error_count();
- auto start_time = std::chrono::steady_clock::now();
- for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
- auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { feed.update_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
- }
- executor.sync();
- auto end_time = std::chrono::steady_clock::now();
- std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = feed_handler.get_error_count() - old_errors;
- double throughput = bm_params.get_documents() / elapsed.count();
- sampler.sample(throughput);
- LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput);
- time_bias += bm_params.get_documents();
-}
-
-void
-run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass,
- const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
-{
- auto& feed = f._feed;
- auto& feed_handler = *f._feed_handler;
- uint32_t old_errors = feed_handler.get_error_count();
- auto start_time = std::chrono::steady_clock::now();
- for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
- auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
- { feed.get_async_task(feed_handler, max_pending, range, serialized_feed); }));
- }
- executor.sync();
- auto end_time = std::chrono::steady_clock::now();
- std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = feed_handler.get_error_count() - old_errors;
- double throughput = bm_params.get_documents() / elapsed.count();
- sampler.sample(throughput);
- LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput);
-}
-
-void
-run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
- const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
-{
- auto& feed = f._feed;
- auto& feed_handler = *f._feed_handler;
- uint32_t old_errors = feed_handler.get_error_count();
- auto start_time = std::chrono::steady_clock::now();
- for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
- auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { feed.remove_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
- }
- executor.sync();
- auto end_time = std::chrono::steady_clock::now();
- std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = feed_handler.get_error_count() - old_errors;
- double throughput = bm_params.get_documents() / elapsed.count();
- sampler.sample(throughput);
- LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput);
- time_bias += bm_params.get_documents();
-}
-
void
-benchmark_async_put(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor,
- int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params)
+benchmark_async_put(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params)
{
AvgSampler sampler;
LOG(info, "--------------------------------");
LOG(info, "putAsync: %u small documents, passes=%u", params.get_documents(), params.get_put_passes());
for (uint32_t pass = 0; pass < params.get_put_passes(); ++pass) {
- run_put_async_tasks(f, executor, pass, time_bias, feed, params, sampler);
+ feeder.run_put_async_tasks(pass, time_bias, serialized_feed, params, sampler);
}
LOG(info, "putAsync: AVG puts/s: %8.2f", sampler.avg());
}
void
-benchmark_async_update(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor,
- int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params)
+benchmark_async_update(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params)
{
if (params.get_update_passes() == 0) {
return;
@@ -300,14 +154,13 @@ benchmark_async_update(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
LOG(info, "--------------------------------");
LOG(info, "updateAsync: %u small documents, passes=%u", params.get_documents(), params.get_update_passes());
for (uint32_t pass = 0; pass < params.get_update_passes(); ++pass) {
- run_update_async_tasks(f, executor, pass, time_bias, feed, params, sampler);
+ feeder.run_update_async_tasks(pass, time_bias, serialized_feed, params, sampler);
}
LOG(info, "updateAsync: AVG updates/s: %8.2f", sampler.avg());
}
void
-benchmark_async_get(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor,
- const std::vector<vespalib::nbostream>& feed, const BMParams& params)
+benchmark_async_get(BmFeeder& feeder, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params)
{
if (params.get_get_passes() == 0) {
return;
@@ -316,14 +169,13 @@ benchmark_async_get(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor
LOG(info, "getAsync: %u small documents, passes=%u", params.get_documents(), params.get_get_passes());
AvgSampler sampler;
for (uint32_t pass = 0; pass < params.get_get_passes(); ++pass) {
- run_get_async_tasks(f, executor, pass, feed, params, sampler);
+ feeder.run_get_async_tasks(pass, serialized_feed, params, sampler);
}
LOG(info, "getAsync: AVG gets/s: %8.2f", sampler.avg());
}
void
-benchmark_async_remove(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor,
- int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const BMParams& params)
+benchmark_async_remove(BmFeeder& feeder, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed, const BMParams& params)
{
if (params.get_remove_passes() == 0) {
return;
@@ -332,32 +184,31 @@ benchmark_async_remove(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
LOG(info, "removeAsync: %u small documents, passes=%u", params.get_documents(), params.get_remove_passes());
AvgSampler sampler;
for (uint32_t pass = 0; pass < params.get_remove_passes(); ++pass) {
- run_remove_async_tasks(f, executor, pass, time_bias, feed, params, sampler);
+ feeder.run_remove_async_tasks(pass, time_bias, serialized_feed, params, sampler);
}
LOG(info, "removeAsync: AVG removes/s: %8.2f", sampler.avg());
}
-void benchmark_async_spi(const BMParams &bm_params)
+void benchmark_async(const BMParams &bm_params)
{
vespalib::rmdir(base_dir, true);
PersistenceProviderFixture f(bm_params);
auto& cluster = *f._bm_cluster;
cluster.start(f._feed);
- f._feed_handler = cluster.get_feed_handler();
vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki);
+ BmFeeder feeder(f._repo, *cluster.get_feed_handler(), executor);
auto& feed = f._feed;
- auto put_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put");
- auto update_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
- auto remove_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
+ auto put_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put");
+ auto update_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
+ auto remove_feed = feed.make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
int64_t time_bias = 1;
- LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str());
- benchmark_async_put(f, executor, time_bias, put_feed, bm_params);
- benchmark_async_update(f, executor, time_bias, update_feed, bm_params);
- benchmark_async_get(f, executor, remove_feed, bm_params);
- benchmark_async_remove(f, executor, time_bias, remove_feed, bm_params);
+ LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
+ benchmark_async_put(feeder, time_bias, put_feed, bm_params);
+ benchmark_async_update(feeder, time_bias, update_feed, bm_params);
+ benchmark_async_get(feeder, remove_feed, bm_params);
+ benchmark_async_remove(feeder, time_bias, remove_feed, bm_params);
LOG(info, "--------------------------------");
- f._feed_handler = nullptr;
cluster.stop();
}
@@ -562,7 +413,7 @@ App::Main()
usage();
return 1;
}
- benchmark_async_spi(_bm_params);
+ benchmark_async(_bm_params);
return 0;
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
index 17f9bff38d3..184ada0ab38 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -6,6 +6,8 @@ vespa_add_library(searchcore_bmcluster STATIC
bm_cluster_params.cpp
bm_distribution.cpp
bm_feed.cpp
+ bm_feeder.cpp
+ bm_feed_params.cpp
bm_message_bus.cpp
bm_message_bus_routes.cpp
bm_node.cpp
diff --git a/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h
new file mode 100644
index 00000000000..69562d42276
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstddef>
+
+namespace search::bmcluster {
+
+/*
+ * Class used to calculate average value of samples.
+ */
+class AvgSampler {
+private:
+ double _total;
+ size_t _samples;
+
+public:
+ AvgSampler() : _total(0), _samples(0) {}
+ void sample(double val) {
+ _total += val;
+ ++_samples;
+ }
+ double avg() const { return _total / (double)_samples; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp
index e082f2b96a1..1a822dc1769 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bm_feed.h"
+#include "avg_sampler.h"
+#include "bm_feed_params.h"
#include "bm_range.h"
#include "bucket_selector.h"
#include "pending_tracker.h"
@@ -15,8 +17,11 @@
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/document/update/documentupdate.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <cassert>
+#include <chrono>
#include <vespa/log/log.h>
LOG_SETUP(".bmcluster.bm_feed");
@@ -29,6 +34,7 @@ using document::DocumentTypeRepo;
using document::DocumentUpdate;
using document::IntFieldValue;
using document::FieldUpdate;
+using vespalib::makeLambdaTask;
namespace search::bmcluster {
@@ -115,81 +121,24 @@ BmFeed::make_remove_feed(BmRange range, BucketSelector bucket_selector)
}
-void
-BmFeed::put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+std::vector<vespalib::nbostream>
+BmFeed::make_feed(vespalib::ThreadStackExecutor& executor, const BmFeedParams& params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
{
- LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- PendingTracker pending_tracker(max_pending);
- feed_handler.attach_bucket_info_queue(pending_tracker);
- auto &repo = *_repo;
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- document::BucketId bucket_id;
- bool use_timestamp = !feed_handler.manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(_bucket_space, bucket_id);
- auto document = std::make_unique<Document>(repo, is);
- feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
-void
-BmFeed::update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- PendingTracker pending_tracker(max_pending);
- feed_handler.attach_bucket_info_queue(pending_tracker);
- auto &repo = *_repo;
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- document::BucketId bucket_id;
- bool use_timestamp = !feed_handler.manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(_bucket_space, bucket_id);
- auto document_update = DocumentUpdate::createHEAD(repo, is);
- feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
-void
-BmFeed::get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed)
-{
- LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end());
- search::bmcluster::PendingTracker pending_tracker(max_pending);
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- document::BucketId bucket_id;
- vespalib::string all_fields(document::AllFields::NAME);
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(_bucket_space, bucket_id);
- DocumentId document_id(is);
- feed_handler.get(bucket, all_fields, document_id, pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
-void
-BmFeed::remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- search::bmcluster::PendingTracker pending_tracker(max_pending);
- feed_handler.attach_bucket_info_queue(pending_tracker);
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- document::BucketId bucket_id;
- bool use_timestamp = !feed_handler.manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(_bucket_space, bucket_id);
- DocumentId document_id(is);
- feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ LOG(info, "make_feed %s %u small documents", label.c_str(), params.get_documents());
+ std::vector<vespalib::nbostream> serialized_feed_v;
+ auto start_time = std::chrono::steady_clock::now();
+ serialized_feed_v.resize(params.get_client_threads());
+ for (uint32_t i = 0; i < params.get_client_threads(); ++i) {
+ auto range = params.get_range(i);
+ BucketSelector bucket_selector(i, params.get_client_threads(), num_buckets);
+ executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]()
+ { serialized_feed_v[i] = func(range, bucket_selector); }));
}
- assert(is.empty());
- pending_tracker.drain();
+ executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ LOG(info, "%8.2f %s data elements/s", params.get_documents() / elapsed.count(), label.c_str());
+ return serialized_feed_v;
}
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h
index a6afe7b10d9..f8596723634 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h
@@ -4,7 +4,6 @@
#include <vespa/document/base/documentid.h>
#include <vespa/document/bucket/bucketspace.h>
-#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/bucket/bucket.h>
namespace document {
@@ -17,13 +16,18 @@ class Field;
}
-namespace vespalib { class nbostream; }
+namespace vespalib {
+
+class ThreadStackExecutor;
+class nbostream;
+
+}
namespace search::bmcluster {
+class BmFeedParams;
class BmRange;
class BucketSelector;
-class IBmFeedHandler;
/*
* Class to generate synthetic feed of documents.
@@ -35,7 +39,7 @@ class BmFeed {
uint32_t _bucket_bits;
document::BucketSpace _bucket_space;
public:
-
+
BmFeed(std::shared_ptr<const document::DocumentTypeRepo> document_types);
~BmFeed();
uint32_t num_buckets() const { return (1u << _bucket_bits); }
@@ -48,10 +52,7 @@ public:
vespalib::nbostream make_put_feed(BmRange range, BucketSelector bucket_selector);
vespalib::nbostream make_update_feed(BmRange range, BucketSelector bucket_selector);
vespalib::nbostream make_remove_feed(BmRange range, BucketSelector bucket_selector);
- void put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
- void update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
- void get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed);
- void remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ std::vector<vespalib::nbostream> make_feed(vespalib::ThreadStackExecutor& executor, const BmFeedParams& bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string& label);
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp
new file mode 100644
index 00000000000..773232556e7
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.cpp
@@ -0,0 +1,42 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_feed_params.h"
+#include "bm_range.h"
+#include <iostream>
+
+namespace search::bmcluster {
+
+BmFeedParams::BmFeedParams()
+ : _client_threads(1),
+ _documents(160000),
+ _max_pending(1000)
+{
+}
+
+BmFeedParams::~BmFeedParams() = default;
+
+BmRange
+BmFeedParams::get_range(uint32_t thread_id) const
+{
+ return BmRange(get_start(thread_id), get_start(thread_id + 1));
+}
+
+bool
+BmFeedParams::check() const
+{
+ if (_client_threads < 1) {
+ std::cerr << "Too few client threads: " << _client_threads << std::endl;
+ return false;
+ }
+ if (_client_threads > 256) {
+ std::cerr << "Too many client threads: " << _client_threads << std::endl;
+ return false;
+ }
+ if (_documents < _client_threads) {
+ std::cerr << "Too few documents: " << _documents << std::endl;
+ return false;
+ }
+ return true;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h
new file mode 100644
index 00000000000..bc992422982
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed_params.h
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace search::bmcluster {
+
+class BmRange;
+
+/*
+ * Parameters for generating synthetic feed of documents and for
+ * feeding them to the cluster.
+ */
+class BmFeedParams
+{
+ uint32_t _client_threads;
+ uint32_t _documents;
+ uint32_t _max_pending;
+ uint32_t get_start(uint32_t thread_id) const {
+ return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads);
+ }
+public:
+ BmFeedParams();
+ ~BmFeedParams();
+ uint32_t get_client_threads() const { return _client_threads; }
+ uint32_t get_documents() const { return _documents; }
+ uint32_t get_max_pending() const { return _max_pending; }
+ BmRange get_range(uint32_t thread_id) const;
+ void set_documents(uint32_t documents_in) { _documents = documents_in; }
+ void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
+ void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
+ bool check() const;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
new file mode 100644
index 00000000000..1be39f551b9
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
@@ -0,0 +1,199 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_feeder.h"
+#include "avg_sampler.h"
+#include "bm_feed_params.h"
+#include "bm_range.h"
+#include "bucket_selector.h"
+#include "pending_tracker.h"
+#include "i_bm_feed_handler.h"
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <cassert>
+#include <chrono>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bmcluster.bm_feeder");
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentTypeRepo;
+using document::DocumentUpdate;
+using vespalib::makeLambdaTask;
+
+namespace search::bmcluster {
+
+BmFeeder::BmFeeder(std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor)
+ : _repo(std::move(repo)),
+ _bucket_space(document::test::makeBucketSpace("test")),
+ _feed_handler(feed_handler),
+ _executor(executor)
+{
+}
+
+BmFeeder::~BmFeeder() = default;
+
+void
+BmFeeder::put_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+{
+ LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
+ PendingTracker pending_tracker(max_pending);
+ _feed_handler.attach_bucket_info_queue(pending_tracker);
+ auto &repo = *_repo;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ bool use_timestamp = !_feed_handler.manages_timestamp();
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ auto document = std::make_unique<Document>(repo, is);
+ _feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeeder::update_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+{
+ LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
+ PendingTracker pending_tracker(max_pending);
+ _feed_handler.attach_bucket_info_queue(pending_tracker);
+ auto &repo = *_repo;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ bool use_timestamp = !_feed_handler.manages_timestamp();
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ auto document_update = DocumentUpdate::createHEAD(repo, is);
+ _feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeeder::get_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed)
+{
+ LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end());
+ PendingTracker pending_tracker(max_pending);
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ vespalib::string all_fields(document::AllFields::NAME);
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ DocumentId document_id(is);
+ _feed_handler.get(bucket, all_fields, document_id, pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeeder::remove_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+{
+ LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
+ PendingTracker pending_tracker(max_pending);
+ _feed_handler.attach_bucket_info_queue(pending_tracker);
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ bool use_timestamp = !_feed_handler.manages_timestamp();
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ DocumentId document_id(is);
+ _feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeeder::run_put_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler)
+{
+ uint32_t old_errors = _feed_handler.get_error_count();
+ auto start_time = std::chrono::steady_clock::now();
+ for (uint32_t i = 0; i < params.get_client_threads(); ++i) {
+ auto range = params.get_range(i);
+ _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { put_async_task(max_pending, range, serialized_feed, time_bias); }));
+ }
+ _executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ uint32_t new_errors = _feed_handler.get_error_count() - old_errors;
+ double throughput = params.get_documents() / elapsed.count();
+ sampler.sample(throughput);
+ LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput);
+ time_bias += params.get_documents();
+}
+
+void
+BmFeeder::run_update_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler)
+{
+ uint32_t old_errors = _feed_handler.get_error_count();
+ auto start_time = std::chrono::steady_clock::now();
+ for (uint32_t i = 0; i < params.get_client_threads(); ++i) {
+ auto range = params.get_range(i);
+ _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { update_async_task(max_pending, range, serialized_feed, time_bias); }));
+ }
+ _executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ uint32_t new_errors = _feed_handler.get_error_count() - old_errors;
+ double throughput = params.get_documents() / elapsed.count();
+ sampler.sample(throughput);
+ LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput);
+ time_bias += params.get_documents();
+}
+
+void
+BmFeeder::run_get_async_tasks(int pass, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler)
+{
+ uint32_t old_errors = _feed_handler.get_error_count();
+ auto start_time = std::chrono::steady_clock::now();
+ for (uint32_t i = 0; i < params.get_client_threads(); ++i) {
+ auto range = params.get_range(i);
+ _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
+ { get_async_task(max_pending, range, serialized_feed); }));
+ }
+ _executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ uint32_t new_errors = _feed_handler.get_error_count() - old_errors;
+ double throughput = params.get_documents() / elapsed.count();
+ sampler.sample(throughput);
+ LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput);
+}
+
+void
+BmFeeder::run_remove_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler)
+{
+ uint32_t old_errors = _feed_handler.get_error_count();
+ auto start_time = std::chrono::steady_clock::now();
+ for (uint32_t i = 0; i < params.get_client_threads(); ++i) {
+ auto range = params.get_range(i);
+ _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { remove_async_task(max_pending, range, serialized_feed, time_bias); }));
+ }
+ _executor.sync();
+ auto end_time = std::chrono::steady_clock::now();
+ std::chrono::duration<double> elapsed = end_time - start_time;
+ uint32_t new_errors = _feed_handler.get_error_count() - old_errors;
+ double throughput = params.get_documents() / elapsed.count();
+ sampler.sample(throughput);
+ LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput);
+ time_bias += params.get_documents();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
new file mode 100644
index 00000000000..0e404f731c6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
@@ -0,0 +1,50 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/document/bucket/bucketspace.h>
+#include <memory>
+
+namespace document {
+
+class DocumentTypeRepo;
+
+}
+
+namespace vespalib {
+
+class ThreadStackExecutor;
+class nbostream;
+
+}
+
+namespace search::bmcluster {
+
+class AvgSampler;
+class BmFeedParams;
+class BmRange;
+class IBmFeedHandler;
+
+/*
+ * Class to feed serialized feed operations to a feed handler.
+ */
+class BmFeeder {
+ std::shared_ptr<const document::DocumentTypeRepo> _repo;
+ document::BucketSpace _bucket_space;
+ IBmFeedHandler& _feed_handler;
+ vespalib::ThreadStackExecutor& _executor;
+public:
+ BmFeeder(std::shared_ptr<const document::DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor);
+ ~BmFeeder();
+ void put_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ void update_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ void get_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed);
+ void remove_async_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ void run_put_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler);
+ void run_update_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler);
+ void run_get_async_tasks(int pass, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler);
+ void run_remove_async_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler);
+ IBmFeedHandler& get_feed_handler() const { return _feed_handler; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
index 4de1f21b86f..0bf53a589d6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
@@ -34,4 +34,3 @@ BucketInfoQueue::get_bucket_info_loop()
}
}
-