summaryrefslogtreecommitdiffstats
path: root/searchcore/src/apps/vespa-redistribute-bm
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-06 12:45:35 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-06 12:49:23 +0200
commit780f8c25677588810fa55b82692c468603162d66 (patch)
treebdefe091f0de4f7506f0437b2c8d74310f931608 /searchcore/src/apps/vespa-redistribute-bm
parentf63d33eb7a47c3bfd80229098a71eae1b3b48300 (diff)
Enable refeed during document redistribution.
Diffstat (limited to 'searchcore/src/apps/vespa-redistribute-bm')
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp120
1 files changed, 115 insertions, 5 deletions
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
index 3fc42e50cdb..1272521ff54 100644
--- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -54,6 +54,7 @@ using search::bmcluster::BmRange;
using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
using storage::lib::State;
+using vespalib::makeLambdaTask;
namespace {
@@ -74,7 +75,7 @@ enum class Mode {
PERM_CRASH,
TEMP_CRASH,
REPLACE,
- BAD,
+ BAD
};
std::vector<vespalib::string> mode_names = {
@@ -101,19 +102,44 @@ vespalib::string& get_mode_name(Mode mode) {
return (i < mode_names.size()) ? mode_names[i] : bad_mode_name;
}
+enum ReFeedMode {
+ NONE,
+ PUT,
+ UPDATE,
+ BAD
+};
+
+std::vector<vespalib::string> refeed_mode_names = {
+ "none",
+ "put",
+ "update"
+};
+
+ReFeedMode get_refeed_mode(const vespalib::string& refeed_mode_name) {
+ for (uint32_t i = 0; i < refeed_mode_names.size(); ++i) {
+ if (refeed_mode_name == refeed_mode_names[i]) {
+ return static_cast<ReFeedMode>(i);
+ }
+ }
+ return ReFeedMode::BAD;
+}
+
class BMParams : public BmClusterParams,
public BmFeedParams
{
uint32_t _flip_nodes;
Mode _mode;
+ ReFeedMode _refeed_mode;
bool _use_feed_settle;
public:
BMParams();
uint32_t get_flip_nodes() const noexcept { return _flip_nodes; }
Mode get_mode() const noexcept { return _mode; }
+ ReFeedMode get_refeed_mode() const noexcept { return _refeed_mode; }
bool get_use_feed_settle() const noexcept { return _use_feed_settle; }
void set_flip_nodes(uint32_t value) { _flip_nodes = value; }
void set_mode(Mode value) { _mode = value; }
+ void set_refeed_mode(ReFeedMode value) { _refeed_mode = value; }
void set_use_feed_settle(bool value) { _use_feed_settle = value; }
bool check() const;
};
@@ -123,6 +149,7 @@ BMParams::BMParams()
BmFeedParams(),
_flip_nodes(1u),
_mode(Mode::GROW),
+ _refeed_mode(ReFeedMode::NONE),
_use_feed_settle(false)
{
set_enable_service_layer(true);
@@ -160,9 +187,55 @@ BMParams::check() const
std::cerr << "Bad mode" << std::endl;
return false;
}
+ if (_refeed_mode == ReFeedMode::BAD) {
+ std::cerr << "Bad refeed-mode" << std::endl;
+ return false;
+ }
return true;
}
+class ReFeed {
+ vespalib::ThreadStackExecutor _top_executor;
+ vespalib::ThreadStackExecutor _executor;
+ BmFeeder _feeder;
+ const vespalib::string _op_name;
+ const BMParams& _params;
+ int64_t& _time_bias;
+ const std::vector<vespalib::nbostream>& _feed;
+
+ void run();
+public:
+ ReFeed(const BMParams& params, std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const vespalib::string& op_name);
+ ~ReFeed();
+};
+
+ReFeed::ReFeed(const BMParams& params, std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, int64_t& time_bias, const std::vector<vespalib::nbostream>& feed, const vespalib::string& op_name)
+ : _top_executor(1, 128_Ki),
+ _executor(params.get_client_threads(), 128_Ki),
+ _feeder(repo, feed_handler, _executor),
+ _op_name(op_name),
+ _params(params),
+ _time_bias(time_bias),
+ _feed(feed)
+{
+ _top_executor.execute(makeLambdaTask([this]()
+ { run(); }));
+}
+
+ReFeed::~ReFeed()
+{
+ _feeder.stop();
+ _top_executor.sync();
+ _top_executor.shutdown();
+}
+
+void
+ReFeed::run()
+{
+ std::this_thread::sleep_for(2s);
+ _feeder.run_feed_tasks_loop(_time_bias, _feed, _params, _op_name);
+}
+
}
class Benchmark {
@@ -171,10 +244,14 @@ class Benchmark {
std::shared_ptr<const DocumentTypeRepo> _repo;
std::unique_ptr<BmCluster> _cluster;
BmFeed _feed;
+ std::vector<vespalib::nbostream> _put_feed;
+ std::vector<vespalib::nbostream> _update_feed;
+ int64_t _time_bias;
void adjust_cluster_state_before_feed();
void adjust_cluster_state_after_feed();
void adjust_cluster_state_after_first_redistribution();
+ void make_feed();
void feed();
std::chrono::duration<double> redistribute();
@@ -189,7 +266,10 @@ Benchmark::Benchmark(const BMParams& params)
_document_types(make_document_types()),
_repo(document::DocumentTypeRepoFactory::make(*_document_types)),
_cluster(std::make_unique<BmCluster>(base_dir, base_port, _params, _document_types, _repo)),
- _feed(_repo)
+ _feed(_repo),
+ _put_feed(),
+ _update_feed(),
+ _time_bias(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count())
{
_cluster->make_nodes();
}
@@ -274,17 +354,25 @@ Benchmark::adjust_cluster_state_after_first_redistribution()
}
void
+Benchmark::make_feed()
+{
+ vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
+ _put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
+ if (_params.get_refeed_mode() == ReFeedMode::UPDATE) {
+ _update_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_update_feed(range, bucket_selector); }, _feed.num_buckets(), "update");
+ }
+}
+
+void
Benchmark::feed()
{
vespalib::ThreadStackExecutor executor(_params.get_client_threads(), 128_Ki);
BmFeeder feeder(_repo, *_cluster->get_feed_handler(), executor);
- auto put_feed = _feed.make_feed(executor, _params, [this](BmRange range, BucketSelector bucket_selector) { return _feed.make_put_feed(range, bucket_selector); }, _feed.num_buckets(), "put");
BmNodeStatsReporter reporter(*_cluster, false);
reporter.start(500ms);
- int64_t time_bias = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch() - 24h).count();
LOG(info, "Feed handler is '%s'", feeder.get_feed_handler().get_name().c_str());
AvgSampler sampler;
- feeder.run_feed_tasks(0, time_bias, put_feed, _params, sampler, "put");
+ feeder.run_feed_tasks(0, _time_bias, _put_feed, _params, sampler, "put");
reporter.report_now();
if (_params.get_use_feed_settle()) {
LOG(info, "Settling feed");
@@ -302,6 +390,17 @@ Benchmark::redistribute()
reporter.start(500ms);
_cluster->propagate_cluster_state();
reporter.report_now();
+ std::unique_ptr<ReFeed> refeed;
+ switch (_params.get_refeed_mode()) {
+ case ReFeedMode::PUT:
+ refeed = std::make_unique<ReFeed>(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _put_feed, "put");
+ break;
+ case ReFeedMode::UPDATE:
+ refeed = std::make_unique<ReFeed>(_params, _repo, *_cluster->get_feed_handler(), _time_bias, _update_feed, "update");
+ break;
+ default:
+ ;
+ }
for (;;) {
auto duration = std::chrono::steady_clock::now() - reporter.get_change_time();
if (duration >= 6s) {
@@ -309,6 +408,7 @@ Benchmark::redistribute()
}
std::this_thread::sleep_for(100ms);
}
+ refeed.reset();
return reporter.get_change_time() - before;
}
@@ -317,6 +417,7 @@ Benchmark::run()
{
adjust_cluster_state_before_feed();
_cluster->start(_feed);
+ make_feed();
feed();
LOG(info, "--------------------------------");
auto old_snapshot = _cluster->get_bucket_db_snapshots();
@@ -382,6 +483,7 @@ App::usage()
"[--mode [grow, shrink, perm-crash, temp-crash, replace]\n"
"[--nodes-per-group nodes-per-group]\n"
"[--redundancy redundancy]\n"
+ "[--refeed-mode [none, put, update]\n"
"[--rpc-events-before-wakeup events]\n"
"[--rpc-network-threads threads]\n"
"[--rpc-targets-per-node targets]\n"
@@ -415,6 +517,7 @@ App::get_options()
{ "mode", 1, nullptr, 0 },
{ "nodes-per-group", 1, nullptr, 0 },
{ "redundancy", 1, nullptr, 0 },
+ { "refeed-mode", 1, nullptr, 0 },
{ "response-threads", 1, nullptr, 0 },
{ "rpc-events-before-wakeup", 1, nullptr, 0 },
{ "rpc-network-threads", 1, nullptr, 0 },
@@ -442,6 +545,7 @@ App::get_options()
LONGOPT_MODE,
LONGOPT_NODES_PER_GROUP,
LONGOPT_REDUNDANCY,
+ LONGOPT_REFEED,
LONGOPT_RESPONSE_THREADS,
LONGOPT_RPC_EVENTS_BEFORE_WAKEUP,
LONGOPT_RPC_NETWORK_THREADS,
@@ -510,6 +614,12 @@ App::get_options()
case LONGOPT_REDUNDANCY:
_bm_params.set_redundancy(atoi(opt_argument));
break;
+ case LONGOPT_REFEED:
+ _bm_params.set_refeed_mode(get_refeed_mode(opt_argument));
+ if (_bm_params.get_refeed_mode() == ReFeedMode::BAD) {
+ std::cerr << "Unknown refeed-mode name " << opt_argument << std::endl;
+ }
+ break;
case LONGOPT_RESPONSE_THREADS:
_bm_params.set_response_threads(atoi(opt_argument));
break;