summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-06 12:45:35 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-06 12:49:23 +0200
commit780f8c25677588810fa55b82692c468603162d66 (patch)
treebdefe091f0de4f7506f0437b2c8d74310f931608 /searchcore/src
parentf63d33eb7a47c3bfd80229098a71eae1b3b48300 (diff)
Enable refeed during document redistribution.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp120
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h17
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h6
4 files changed, 160 insertions, 23 deletions
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
index 3fc42e50cdb..1272521ff54 100644
--- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -54,6 +54,7 @@ using search::bmcluster::BmRange;
using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
using storage::lib::State;
+using vespalib::makeLambdaTask;
namespace {
@@ -74,7 +75,7 @@ enum class Mode {
PERM_CRASH,
TEMP_CRASH,
REPLACE,
- BAD,
+ BAD
};
std::vector<vespalib::string> mode_names = {
@@ -101,19 +102,44 @@ vespalib::string& get_mode_name(Mode mode) {
return (i < mode_names.size()) ? mode_names[i] : bad_mode_name;
}
+enum ReFeedMode {
+ NONE,
+ PUT,
+ UPDATE,
+ BAD
+};
+
+std::vector<vespalib::string> refeed_mode_names = {
+ "none",
+ "put",
+ "update"
+};
+
+ReFeedMode get_refeed_mode(const vespalib::string& refeed_mode_name) {
+ for (uint32_t i = 0; i < refeed_mode_names.size(); ++i) {
+ if (refeed_mode_name == refeed_mode_names[i]) {
+ return static_cast<ReFeedMode>(i);
+ }
+ }
+ return ReFeedMode::BAD;
+}
+
class BMParams : public BmClusterParams,
public BmFeedParams
{
uint32_t _flip_nodes;
Mode _mode;
+ ReFeedMode _refeed_mode;
bool _use_feed_settle;
public:
BMParams();
uint32_t get_flip_nodes() const noexcept { return _flip_nodes; }
Mode get_mode() const noexcept { return _mode; }
+ ReFeedMode get_refeed_mode() const noexcept { return _refeed_mode; }
bool get_use_feed_settle() const noexcept { return _use_feed_settle; }
void set_flip_nodes(uint32_t value) { _flip_nodes = value; }
void set_mode(Mode value) { _mode = value; }
+ void set_refeed_mode(ReFeedMode value) { _refeed_mode = value; }
void set_use_feed_settle(bool value) { _use_feed_settle = value; }
bool check() const;
};
@@ -123,6 +149,7 @@ BMParams::BMParams()
BmFeedParams(),
_flip_nodes(1u),
_mode(Mode::GROW),
+ _refeed_mode(ReFeedMode::NONE),
_use_feed_settle(false)
{
set_enable_service_layer(true);
@@ -160,9 +187,55 @@ BMParams::check() const
std::cerr << "Bad mode" << std::endl;
return false;
}
+ if (_refeed_mode == ReFeedMode::BAD) {
+ std::cerr << "Bad refeed-mode" << std::endl;
+ return false;
+ }
return true;
}
+class ReFeed {
+ vespalib::ThreadStackExecutor _top_executor;
+ vespalib::ThreadStackExecutor _executor;
+ BmFeeder _feeder;
+ const vespalib::string _op_name;
+ const BMParams& _params;
+ int64_t& _time_bias;
+ const std::vector<vespalib::nbostream>& _feed;
+
+ void run();
+public:
+ ReFeed(const BMParams& params, std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const vespalib::string& op_name);
+ ~ReFeed();
+};
+
+ReFeed::ReFeed(const BMParams& params, std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const vespalib::string& op_name)
+ : _top_executor(1, 128_Ki),
+ _executor(params.get_client_threads(), 128_Ki),
+ _feeder(repo, feed_handler, _executor),
+ _op_name(op_name),
+ _params(params),
+ _time_bias(time_bias),
+ _feed(feed)
+{
+ _top_executor.execute(makeLambdaTask([this]()
+ { run(); }));
+}
+
+ReFeed::~ReFeed()
+{
+ _feeder.stop();
+ _top_executor.sync();
+ _top_executor.shutdown();
+}
+
+void
+ReFeed::run()
+{
+ std::this_thread::sleep_for(2s);
+ _feeder.run_feed_tasks_loop(_time_bias, _feed, _params, _op_name);
+}
+
}
class Benchmark {
@@ -171,10 +244,14 @@ class Benchmark {
std::shared_ptr<const DocumentTypeRepo> _repo;
std::unique_ptr<BmCluster> _cluster;
BmFeed _feed;
+ std::vector<vespalib::nbostream> _put_feed;
+ std::vector<vespalib::nbostream> _update_feed;
+ int64_t _time_bias;
void adjust_cluster_state_before_feed();
void adjust_cluster_state_after_feed();
void adjust_cluster_state_after_first_redistribution();
+ void make_feed();
void feed();
std::chrono::duration<double> redistribute();
@@ -189,7 +266,10 @@ Benchmark::Benchmark(const BMParams& params)
_document_types(make_document_types()),
_repo(document::DocumentTypeRepoFactory::make(*_document_types)),
_cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)),
- _feed(_repo)
+ _feed(_repo),
+ _put_feed(),
+ _update_feed(),
+ _time_bias(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count())
{
_cluster->make_nodes();
}
@@ -274,17 +354,25 @@ Benchmark::adjust_cluster_state_after_first_redistribution()
}
void
+Benchmark::make_feed()
+{
+ vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
+ _put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
+ if (_params.get_refeed_mode() == ReFeedMode::UPDATE) {
+ _update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update");
+ }
+}
+
+void
Benchmark::feed()
{
vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
- auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
BmNodeStatsReporter reporter(*_cluster, false);
reporter.start(500ms);
- int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
AvgSampler sampler;
- feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put");
+ feeder.run_feed_tasks(0, _time_bias, _put_feed, _params, sampler, "put");
reporter.report_now();
if (_params.get_use_feed_settle()) {
LOG(info, "Settling feed");
@@ -302,6 +390,17 @@ Benchmark::redistribute()
reporter.start(500ms);
_cluster->propagate_cluster_state();
reporter.report_now();
+ std::unique_ptr<ReFeed> refeed;
+ switch (_params.get_refeed_mode()) {
+ case ReFeedMode::PUT:
+ refeed = std::make_unique<ReFeed>(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _put_feed, "put");
+ break;
+ case ReFeedMode::UPDATE:
+ refeed = std::make_unique<ReFeed>(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _update_feed, "update");
+ break;
+ default:
+ ;
+ }
for (;;) {
auto duration = std::chrono::steady_clock::now() - reporter.get_change_time();
if (duration >= 6s) {
@@ -309,6 +408,7 @@ Benchmark::redistribute()
}
std::this_thread::sleep_for(100ms);
}
+ refeed.reset();
return reporter.get_change_time() - before;
}
@@ -317,6 +417,7 @@ Benchmark::run()
{
adjust_cluster_state_before_feed();
_cluster->start(_feed);
+ make_feed();
feed();
LOG(info, "--------------------------------");
auto old_snapshot = _cluster->get_bucket_db_snapshots();
@@ -382,6 +483,7 @@ App::usage()
"[--mode [grow, shrink, perm-crash, temp-crash, replace]\n"
"[--nodes-per-group nodes-per-group]\n"
"[--redundancy redundancy]\n"
+ "[--refeed-mode [none, put, update]\n"
"[--rpc-events-before-wakeup events]\n"
"[--rpc-network-threads threads]\n"
"[--rpc-targets-per-node targets]\n"
@@ -415,6 +517,7 @@ App::get_options()
{ "mode", 1, nullptr, 0 },
{ "nodes-per-group", 1, nullptr, 0 },
{ "redundancy", 1, nullptr, 0 },
+ { "refeed-mode", 1, nullptr, 0 },
{ "response-threads", 1, nullptr, 0 },
{ "rpc-events-before-wakeup", 1, nullptr, 0 },
{ "rpc-network-threads", 1, nullptr, 0 },
@@ -442,6 +545,7 @@ App::get_options()
LONGOPT_MODE,
LONGOPT_NODES_PER_GROUP,
LONGOPT_REDUNDANCY,
+ LONGOPT_REFEED,
LONGOPT_RESPONSE_THREADS,
LONGOPT_RPC_EVENTS_BEFORE_WAKEUP,
LONGOPT_RPC_NETWORK_THREADS,
@@ -510,6 +614,12 @@ App::get_options()
case LONGOPT_REDUNDANCY:
_bm_params.set_redundancy(atoi(opt_argument));
break;
+ case LONGOPT_REFEED:
+ _bm_params.set_refeed_mode(get_refeed_mode(opt_argument));
+ if (_bm_params.get_refeed_mode() == ReFeedMode::BAD) {
+ std::cerr << "Unknown refeed-mode name " << opt_argument << std::endl;
+ }
+ break;
case LONGOPT_RESPONSE_THREADS:
_bm_params.set_response_threads(atoi(opt_argument));
break;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h
index 69562d42276..deccb2520f5 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/avg_sampler.h
@@ -7,20 +7,21 @@
namespace search::bmcluster {
/*
- * Class used to calculate average value of samples.
+ * Class used to calculate average feed rate.
*/
class AvgSampler {
private:
- double _total;
- size_t _samples;
+ uint64_t _ops;
+ double _elapsed;
public:
- AvgSampler() : _total(0), _samples(0) {}
- void sample(double val) {
- _total += val;
- ++_samples;
+ AvgSampler() : _ops(0), _elapsed(0.0) {}
+ void sample(uint64_t ops, double elapsed) {
+ _ops += ops;
+ _elapsed += elapsed;
}
- double avg() const { return _total / (double)_samples; }
+ double avg() const { return valid() ? (_ops / _elapsed) : 0.0; }
+ bool valid() const { return _elapsed != 0.0; }
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
index 55160f5d561..0e659e45136 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
@@ -37,7 +37,8 @@ BmFeeder::BmFeeder(std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler&
_feed_handler(feed_handler),
_executor(executor),
_all_fields(document::AllFields::NAME),
- _use_timestamp(!_feed_handler.manages_timestamp())
+ _use_timestamp(!_feed_handler.manages_timestamp()),
+ _stop(false)
{
}
@@ -91,7 +92,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed,
}
-void
+uint32_t
BmFeeder::feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
{
@@ -99,11 +100,14 @@ BmFeeder::feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostre
PendingTracker pending_tracker(max_pending);
_feed_handler.attach_bucket_info_queue(pending_tracker);
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- for (uint32_t i = range.get_start(); i < range.get_end(); ++i) {
+ uint32_t op_count = 0;
+ for (uint32_t i = range.get_start(); i < range.get_end() && !_stop.load(std::memory_order_relaxed); ++i) {
feed_operation(i, is, time_bias, pending_tracker);
+ ++op_count;
}
- assert(is.empty());
+ assert(is.empty() || _stop.load(std::memory_order_relaxed));
pending_tracker.drain();
+ return op_count;
}
void
@@ -111,19 +115,37 @@ BmFeeder::run_feed_tasks(int pass, int64_t& time_bias, const std::vector<vespali
{
uint32_t old_errors = _feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
+ std::atomic<uint32_t> atomic_op_count(0u);
for (uint32_t i = 0; i < params.get_client_threads(); ++i) {
auto range = params.get_range(i);
- _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { feed_task(max_pending, range, serialized_feed, time_bias); }));
+ _executor.execute(makeLambdaTask([this, max_pending = params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias, &atomic_op_count]()
+ { atomic_op_count += feed_task(max_pending, range, serialized_feed, time_bias); }));
}
_executor.sync();
+ uint32_t op_count = atomic_op_count.load(std::memory_order_relaxed);
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
uint32_t new_errors = _feed_handler.get_error_count() - old_errors;
- double throughput = params.get_documents() / elapsed.count();
- sampler.sample(throughput);
- LOG(info, "%sAsync: pass=%u, errors=%u, %ss/s: %8.2f", op_name.c_str(), pass, new_errors, op_name.c_str(), throughput);
+ double throughput = op_count / elapsed.count();
+ sampler.sample(op_count, elapsed.count());
+ LOG(info, "%sAsync: pass=%u, errors=%u, ops=%u of %u, %ss/s: %8.2f", op_name.c_str(), pass, new_errors, op_count, params.get_documents(), op_name.c_str(), throughput);
time_bias += params.get_documents();
}
+void
+BmFeeder::stop()
+{
+ _stop.store(true, std::memory_order_release);
+}
+
+void
+BmFeeder::run_feed_tasks_loop(int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, const vespalib::string &op_name)
+{
+ AvgSampler sampler;
+ for (int pass = 0; !_stop.load(std::memory_order_relaxed); ++pass) {
+ run_feed_tasks(pass, time_bias, serialized_feed_v, params, sampler, op_name);
+ }
+ LOG(info, "%sAsync: AVG %s/s: %8.2f", op_name.c_str(), op_name.c_str(), sampler.avg());
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
index 1001f572bf2..c8236e7222b 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
@@ -4,6 +4,7 @@
#include <vespa/document/bucket/bucketspace.h>
#include <memory>
+#include <atomic>
namespace document {
@@ -36,13 +37,16 @@ class BmFeeder {
vespalib::ThreadStackExecutor& _executor;
vespalib::string _all_fields;
bool _use_timestamp;
+ std::atomic<bool> _stop;
public:
BmFeeder(std::shared_ptr<const document::DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor);
~BmFeeder();
void feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, PendingTracker& tracker);
- void feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ uint32_t feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
void run_feed_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler, const vespalib::string& op_name);
IBmFeedHandler& get_feed_handler() const { return _feed_handler; }
+ void stop();
+ void run_feed_tasks_loop(int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, const vespalib::string &op_name);
};
}