diff options
7 files changed, 115 insertions, 39 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 3dac66685f9..0fe673cebe0 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -106,6 +106,10 @@ BMParams::check() const std::cerr << "Put passes too low: " << _put_passes << std::endl; return false; } + if (get_groups() > 0 && !needs_distributor()) { + std::cerr << "grouped distribution only allowed when using distributor" << std::endl; + return false; + } return true; } @@ -211,24 +215,25 @@ App::usage() "[--bucket-db-stripe-bits bits]\n" "[--client-threads threads]\n" "[--distributor-stripes stripes]\n" + "[--documents documents]\n" + "[--enable-distributor]\n" + "[--enable-service-layer]\n" "[--get-passes get-passes]\n" + "[--groups groups]\n" "[--indexing-sequencer [latency,throughput,adaptive]]\n" "[--max-pending max-pending]\n" - "[--documents documents]\n" - "[--nodes nodes]\n" + "[--nodes-per-group nodes-per-group]\n" "[--put-passes put-passes]\n" - "[--update-passes update-passes]\n" "[--remove-passes remove-passes]\n" + "[--response-threads threads]\n" "[--rpc-events-before-wakeup events]\n" "[--rpc-network-threads threads]\n" "[--rpc-targets-per-node targets]\n" - "[--response-threads threads]\n" - "[--enable-distributor]\n" - "[--enable-service-layer]\n" "[--skip-communicationmanager-thread]\n" "[--skip-get-spi-bucket-info]\n" - "[--use-document-api]\n" + "[--update-passes update-passes]\n" "[--use-async-message-handling]\n" + "[--use-document-api]\n" "[--use-message-bus\n" "[--use-storage-chain]" << std::endl; } @@ -247,9 +252,10 @@ App::get_options() { "enable-distributor", 0, nullptr, 0 }, { "enable-service-layer", 0, nullptr, 0 }, { "get-passes", 1, nullptr, 0 }, + { "groups", 1, nullptr, 0 }, { "indexing-sequencer", 1, nullptr, 0 }, { "max-pending", 1, nullptr, 0 }, - { "nodes", 1, nullptr, 0 }, + { "nodes-per-group", 1, nullptr, 0 }, { "put-passes", 1, nullptr, 0 }, { "remove-passes", 1, nullptr, 0 }, { "response-threads", 1, nullptr, 0 }, @@ -272,9 +278,10 @@ App::get_options() LONGOPT_ENABLE_DISTRIBUTOR, LONGOPT_ENABLE_SERVICE_LAYER, LONGOPT_GET_PASSES, + LONGOPT_GROUPS, LONGOPT_INDEXING_SEQUENCER, LONGOPT_MAX_PENDING, - LONGOPT_NODES, + LONGOPT_NODES_PER_GROUP, LONGOPT_PUT_PASSES, LONGOPT_REMOVE_PASSES, LONGOPT_RESPONSE_THREADS, @@ -316,14 +323,17 @@ App::get_options() case LONGOPT_GET_PASSES: _bm_params.set_get_passes(atoi(opt_argument)); break; + case LONGOPT_GROUPS: + _bm_params.set_groups(atoi(opt_argument)); + break; case LONGOPT_INDEXING_SEQUENCER: _bm_params.set_indexing_sequencer(opt_argument); break; case LONGOPT_MAX_PENDING: _bm_params.set_max_pending(atoi(opt_argument)); break; - case LONGOPT_NODES: - _bm_params.set_num_nodes(atoi(opt_argument)); + case LONGOPT_NODES_PER_GROUP: + _bm_params.set_nodes_per_group(atoi(opt_argument)); break; case LONGOPT_PUT_PASSES: _bm_params.set_put_passes(atoi(opt_argument)); diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index d9c2c1eda3e..1b8f93b381f 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -128,7 +128,7 @@ BMParams::BMParams() set_enable_service_layer(true); set_enable_distributor(true); set_use_document_api(true); - set_num_nodes(4); + set_nodes_per_group(4); } @@ -370,10 +370,11 @@ App::usage() "[--distributor-stripes stripes]\n" "[--documents documents]\n" "[--flip-nodes flip-nodes]\n" + "[--groups groups]\n" "[--indexing-sequencer [latency,throughput,adaptive]]\n" "[--max-pending max-pending]\n" "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n" - "[--nodes nodes]\n" + "[--nodes-per-group nodes-per-group]\n" "[--redundancy redundancy]\n" "[--rpc-events-before-wakeup events]\n" "[--rpc-network-threads threads]\n" @@ -396,10 +397,11 @@ App::get_options() { "distributor-stripes", 1, nullptr, 0 }, { "documents", 1, nullptr, 0 }, { "flip-nodes", 1, nullptr, 0 }, + { "groups", 1, nullptr, 0 }, { "indexing-sequencer", 1, nullptr, 0 }, { "max-pending", 1, nullptr, 0 }, { "mode", 1, nullptr, 0 }, - { "nodes", 1, nullptr, 0 }, + { "nodes-per-group", 1, nullptr, 0 }, { "redundancy", 1, nullptr, 0 }, { "response-threads", 1, nullptr, 0 }, { "rpc-events-before-wakeup", 1, nullptr, 0 }, @@ -415,10 +417,11 @@ App::get_options() LONGOPT_DISTRIBUTOR_STRIPES, LONGOPT_DOCUMENTS, LONGOPT_FLIP_NODES, + LONGOPT_GROUPS, LONGOPT_INDEXING_SEQUENCER, LONGOPT_MAX_PENDING, LONGOPT_MODE, - LONGOPT_NODES, + LONGOPT_NODES_PER_GROUP, LONGOPT_REDUNDANCY, LONGOPT_RESPONSE_THREADS, LONGOPT_RPC_EVENTS_BEFORE_WAKEUP, @@ -449,6 +452,9 @@ App::get_options() case LONGOPT_FLIP_NODES: _bm_params.set_flip_nodes(atoi(opt_argument)); break; + case LONGOPT_GROUPS: + _bm_params.set_groups(atoi(opt_argument)); + break; case LONGOPT_INDEXING_SEQUENCER: _bm_params.set_indexing_sequencer(opt_argument); break; @@ -461,8 +467,8 @@ App::get_options() std::cerr << "Unknown mode name " << opt_argument << std::endl; } break; - case LONGOPT_NODES: - _bm_params.set_num_nodes(atoi(opt_argument)); + case LONGOPT_NODES_PER_GROUP: + _bm_params.set_nodes_per_group(atoi(opt_argument)); break; case LONGOPT_REDUNDANCY: _bm_params.set_redundancy(atoi(opt_argument)); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp index 4042b809294..a5c4fe6851a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp @@ -149,7 +149,7 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl _document_types(std::move(document_types)), _repo(std::move(repo)), _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)), - _real_distribution(std::make_shared<BmDistribution>(params.get_num_nodes(), params.get_redundancy())), + _real_distribution(std::make_shared<BmDistribution>(params.get_groups(), params.get_nodes_per_group(), params.get_redundancy())), _distribution(_real_distribution), _nodes(params.get_num_nodes()), _cluster_controller(std::make_shared<BmClusterController>(*this, *_distribution)), diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp index f31f5fb6b88..b8319529a3b 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -10,8 +10,10 @@ BmClusterParams::BmClusterParams() _distributor_stripes(0), _enable_distributor(false), _enable_service_layer(false), + _groups(0), _indexing_sequencer(), _num_nodes(1), + _nodes_per_group(1), _redundancy(1), _response_threads(2), // Same default as in stor-filestor.def _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def @@ -24,6 +26,7 @@ BmClusterParams::BmClusterParams() _use_message_bus(false), _use_storage_chain(false) { + recalc_nodes(); } BmClusterParams::~BmClusterParams() = default; @@ -43,11 +46,31 @@ BmClusterParams::check() const std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl; return false; } - if (_num_nodes < _redundancy) { - std::cerr << "Too high redundancy " << _redundancy << " with " << _num_nodes << " nodes" << std::endl; + if (_nodes_per_group < _redundancy) { + std::cerr << "Too high redundancy " << _redundancy << " with " << _nodes_per_group << " nodes per group" << std::endl; return false; } return true; } +void +BmClusterParams::recalc_nodes() +{ + _num_nodes = std::max(1u, _groups) * _nodes_per_group; +} + +void +BmClusterParams::set_groups(uint32_t value) +{ + _groups = value; + recalc_nodes(); +} + +void +BmClusterParams::set_nodes_per_group(uint32_t value) +{ + _nodes_per_group = value; + recalc_nodes(); +} + } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h index 68dff7f52ed..fbb16ba740f 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -16,8 +16,10 @@ class BmClusterParams uint32_t _distributor_stripes; bool _enable_distributor; bool _enable_service_layer; + uint32_t _groups; vespalib::string _indexing_sequencer; uint32_t _num_nodes; + uint32_t _nodes_per_group; uint32_t _redundancy; uint32_t _response_threads; uint32_t _rpc_events_before_wakeup; @@ -29,13 +31,16 @@ class BmClusterParams bool _use_document_api; bool _use_message_bus; bool _use_storage_chain; + void recalc_nodes(); public: BmClusterParams(); ~BmClusterParams(); uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } uint32_t get_distributor_stripes() const { return _distributor_stripes; } bool get_enable_distributor() const { return _enable_distributor; } + uint32_t get_groups() const noexcept { return _groups; } const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } + uint32_t get_nodes_per_group() const noexcept { return _nodes_per_group; } uint32_t get_num_nodes() const { return _num_nodes; } uint32_t get_redundancy() const { return _redundancy; } uint32_t get_response_threads() const { return _response_threads; } @@ -55,8 +60,9 @@ public: void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; } void set_enable_distributor(bool value) { _enable_distributor = value; } void set_enable_service_layer(bool value) { _enable_service_layer = value; } + void set_groups(uint32_t value); void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } - void set_num_nodes(uint32_t value) { _num_nodes = value; } + void set_nodes_per_group(uint32_t value); void set_redundancy(uint32_t value) { _redundancy = value; } void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp index 2ab1d4fd7f7..f2a4bf49c50 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp @@ -18,27 +18,58 @@ using DistributionConfigBuilder = BmDistribution::DistributionConfigBuilder; namespace { +void +add_nodes_to_group(DistributionConfigBuilder::Group &group, uint32_t first_node_idx, uint32_t nodes_per_group) +{ + for (uint32_t i = 0; i < nodes_per_group; ++i) { + DistributionConfigBuilder::Group::Nodes node; + node.index = first_node_idx + i; + group.nodes.push_back(std::move(node)); + } +} + BmDistribution::DistributionConfig -make_distribution_config(uint32_t num_nodes, uint32_t redundancy) +make_distribution_config(uint32_t nodes_per_group, uint32_t groups, uint32_t redundancy) { DistributionConfigBuilder dc; { - DistributionConfigBuilder::Group group; { - for (uint32_t i = 0; i < num_nodes; ++i) { - DistributionConfigBuilder::Group::Nodes node; - node.index = i; - group.nodes.push_back(std::move(node)); + DistributionConfigBuilder::Group group; + group.index = "invalid"; + group.name = "invalid"; + group.capacity = 1.0; + if (groups == 0u) { + add_nodes_to_group(group, 0, nodes_per_group); + group.partitions = ""; + dc.redundancy = redundancy; + dc.readyCopies = redundancy; + } else { + vespalib::asciistream partitions; + for (uint32_t group_idx = 0; group_idx < groups; ++group_idx) { + if (group_idx + 1< groups) { + partitions << redundancy << '|'; + } else { + partitions << '*'; + } + } + group.partitions = partitions.str(); + dc.redundancy = redundancy * groups; + dc.readyCopies = redundancy * groups; } + dc.group.push_back(std::move(group)); + } + uint32_t node_idx = 0; + for (uint32_t group_idx = 0; group_idx < groups; ++group_idx) { + DistributionConfigBuilder::Group group; + group.index = std::to_string(group_idx); + group.name = "group_" + group.index; + group.capacity = 1.0; + group.partitions = ""; + add_nodes_to_group(group, node_idx, nodes_per_group); + node_idx += nodes_per_group; + dc.group.push_back(std::move(group)); } - group.index = "invalid"; - group.name = "invalid"; - group.capacity = 1.0; - group.partitions = ""; - dc.group.push_back(std::move(group)); } - dc.redundancy = redundancy; - dc.readyCopies = redundancy; return dc; } @@ -52,11 +83,11 @@ make_cluster_state(uint32_t num_nodes) } -BmDistribution::BmDistribution(uint32_t num_nodes, uint32_t redundancy) - : _num_nodes(num_nodes), - _distribution_config(make_distribution_config(num_nodes, redundancy)), +BmDistribution::BmDistribution(uint32_t groups, uint32_t nodes_per_group, uint32_t redundancy) + : _num_nodes(std::max(1u, groups) * nodes_per_group), + _distribution_config(make_distribution_config(nodes_per_group, groups, redundancy)), _distribution(_distribution_config), - _pending_cluster_state(make_cluster_state(num_nodes)), + _pending_cluster_state(make_cluster_state(_num_nodes)), _cluster_state_bundle(_pending_cluster_state), _has_pending_cluster_state(false) { diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h index 6be7592f561..9963843eea9 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h @@ -23,7 +23,7 @@ class BmDistribution : public IBmDistribution storage::lib::ClusterStateBundle _cluster_state_bundle; bool _has_pending_cluster_state; public: - BmDistribution(uint32_t num_nodes, uint32_t redundancy); + BmDistribution(uint32_t groups, uint32_t nodes_per_group, uint32_t redundancy); ~BmDistribution() override; uint32_t get_num_nodes() const override; uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const override; |