diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-28 20:31:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-28 20:31:25 +0200 |
commit | f0ee81a4dea657f1bfb1ece2b87c7fba9bc8e26d (patch) | |
tree | 15e224fd041f3c0681697fe25193ccb35cbf2cd5 | |
parent | da128ad1035532d1097750b46ad17518b1e2df83 (diff) | |
parent | 928c9b1bb606b2c51c2bcc704583f21a0392f222 (diff) |
Merge pull request #14602 from vespa-engine/toregge/add-distributor-to-vespa-feed-bm
Add distributor "process" to vespa-feed-bm.
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | 190 |
1 files changed, 158 insertions, 32 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 73d741c3fee..6fcb75308db 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -62,6 +62,9 @@ #include <vespa/messagebus/config-messagebus.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storageserver/app/distributorprocess.h> +#include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/storage/config/config-stor-visitordispatcher.h> #include <getopt.h> #include <iostream> @@ -87,9 +90,11 @@ using vespa::config::content::core::StorOpsloggerConfigBuilder; using vespa::config::content::core::StorPrioritymappingConfigBuilder; using vespa::config::content::LoadTypeConfigBuilder; using vespa::config::content::UpgradingConfigBuilder; +using vespa::config::content::core::StorDistributormanagerConfigBuilder; using vespa::config::content::core::StorServerConfigBuilder; using vespa::config::content::core::StorStatusConfigBuilder; using vespa::config::content::core::StorVisitorConfigBuilder; +using vespa::config::content::core::StorVisitordispatcherConfigBuilder; using metrics::MetricsmanagerConfigBuilder; using cloud::config::SlobroksConfigBuilder; using messagebus::MessagebusConfigBuilder; @@ -211,6 +216,7 @@ class BMParams { uint32_t _update_passes; uint32_t _remove_passes; uint32_t _rpc_network_threads; + bool _enable_distributor; bool _enable_service_layer; bool _use_storage_chain; uint32_t get_start(uint32_t thread_id) const { @@ -224,6 +230,7 @@ public: _update_passes(1), _remove_passes(2), _rpc_network_threads(1), + _enable_distributor(false), _enable_service_layer(false), _use_storage_chain(false) { @@ -237,6 +244,7 @@ public: uint32_t get_update_passes() const { return _update_passes; } uint32_t get_remove_passes() const { return _remove_passes; } uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } + bool get_enable_distributor() const { return _enable_distributor; } bool get_enable_service_layer() const { return _enable_service_layer; } bool get_use_storage_chain() const { return _use_storage_chain; } void set_documents(uint32_t documents_in) { _documents = documents_in; } @@ -245,6 +253,7 @@ public: void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } + void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } bool check() const; @@ -273,6 +282,10 @@ BMParams::check() const std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; return false; } + if (_enable_distributor && !_enable_service_layer) { + std::cerr << "Service layer must be enabled if distributor layer is enabled" << std::endl; + return false; + } return true; } @@ -322,39 +335,31 @@ struct MyStorageConfig { vespalib::string config_id; DocumenttypesConfigBuilder documenttypes; - PersistenceConfigBuilder persistence; StorDistributionConfigBuilder stor_distribution; - StorFilestorConfigBuilder stor_filestor; StorBouncerConfigBuilder stor_bouncer; StorCommunicationmanagerConfigBuilder stor_communicationmanager; - StorBucketInitConfigBuilder stor_bucket_init; StorOpsloggerConfigBuilder stor_opslogger; StorPrioritymappingConfigBuilder stor_prioritymapping; UpgradingConfigBuilder upgrading; StorServerConfigBuilder stor_server; StorStatusConfigBuilder stor_status; - StorVisitorConfigBuilder stor_visitor; BucketspacesConfigBuilder bucketspaces; LoadTypeConfigBuilder load_type; MetricsmanagerConfigBuilder metricsmanager; SlobroksConfigBuilder slobroks; MessagebusConfigBuilder messagebus; - MyStorageConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int status_port, uint32_t rpc_network_threads) + MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, int rpc_network_threads) : config_id(config_id_in), documenttypes(documenttypes_in), - persistence(), stor_distribution(), - stor_filestor(), stor_bouncer(), stor_communicationmanager(), - stor_bucket_init(), stor_opslogger(), stor_prioritymapping(), upgrading(), stor_server(), stor_status(), - stor_visitor(), bucketspaces(), load_type(), metricsmanager(), @@ -379,7 +384,12 @@ struct MyStorageConfig dc.redundancy = 1; dc.readyCopies = 1; } - stor_server.rootFolder = "storage"; + stor_server.isDistributor = distributor; + if (distributor) { + stor_server.rootFolder = "distributor"; + } else { + stor_server.rootFolder = "storage"; + } { SlobroksConfigBuilder::Slobrok slobrok; slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); @@ -387,6 +397,9 @@ struct MyStorageConfig } stor_communicationmanager.useDirectStorageapiRpc = true; stor_communicationmanager.rpc.numNetworkThreads = rpc_network_threads; + stor_communicationmanager.mbusport = mbus_port; + stor_communicationmanager.rpcport = rpc_port; + stor_status.httpport = status_port; } @@ -394,18 +407,14 @@ struct MyStorageConfig void add_builders(ConfigSet &set) { set.addBuilder(config_id, &documenttypes); - set.addBuilder(config_id, &persistence); set.addBuilder(config_id, &stor_distribution); - set.addBuilder(config_id, &stor_filestor); set.addBuilder(config_id, &stor_bouncer); set.addBuilder(config_id, &stor_communicationmanager); - set.addBuilder(config_id, &stor_bucket_init); set.addBuilder(config_id, &stor_opslogger); set.addBuilder(config_id, &stor_prioritymapping); set.addBuilder(config_id, &upgrading); set.addBuilder(config_id, &stor_server); set.addBuilder(config_id, &stor_status); - set.addBuilder(config_id, &stor_visitor); set.addBuilder(config_id, &bucketspaces); set.addBuilder(config_id, &load_type); set.addBuilder(config_id, &metricsmanager); @@ -416,6 +425,58 @@ struct MyStorageConfig MyStorageConfig::~MyStorageConfig() = default; +struct MyServiceLayerConfig : public MyStorageConfig +{ + PersistenceConfigBuilder persistence; + StorFilestorConfigBuilder stor_filestor; + StorBucketInitConfigBuilder stor_bucket_init; + StorVisitorConfigBuilder stor_visitor; + + MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, uint32_t rpc_network_threads) + : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, rpc_network_threads), + persistence(), + stor_filestor(), + stor_bucket_init(), + stor_visitor() + { + } + + ~MyServiceLayerConfig(); + + void add_builders(ConfigSet &set) { + MyStorageConfig::add_builders(set); + set.addBuilder(config_id, &persistence); + set.addBuilder(config_id, &stor_filestor); + set.addBuilder(config_id, &stor_bucket_init); + set.addBuilder(config_id, &stor_visitor); + } +}; + +MyServiceLayerConfig::~MyServiceLayerConfig() = default; + +struct MyDistributorConfig : public MyStorageConfig +{ + StorDistributormanagerConfigBuilder stor_distributormanager; + StorVisitordispatcherConfigBuilder stor_visitordispatcher; + + MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, int rpc_network_threads) + : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, rpc_network_threads), + stor_distributormanager(), + stor_visitordispatcher() + { + } + + ~MyDistributorConfig(); + + void add_builders(ConfigSet &set) { + MyStorageConfig::add_builders(set); + set.addBuilder(config_id, &stor_distributormanager); + set.addBuilder(config_id, &stor_visitordispatcher); + } +}; + +MyDistributorConfig::~MyDistributorConfig() = default; + struct MyRpcClientConfig { vespalib::string config_id; SlobroksConfigBuilder slobroks; @@ -452,8 +513,13 @@ struct PersistenceProviderFixture { DummyFileHeaderContext _file_header_context; int _tls_listen_port; int _slobrok_port; - int _status_port; int _rpc_client_port; + int _service_layer_mbus_port; + int _service_layer_rpc_port; + int _service_layer_status_port; + int _distributor_mbus_port; + int _distributor_rpc_port; + int _distributor_status_port; TransLogServer _tls; vespalib::string _tls_spec; matching::QueryLimiter _query_limiter; @@ -468,14 +534,17 @@ struct PersistenceProviderFixture { MyResourceWriteFilter _write_filter; std::shared_ptr<PersistenceEngine> _persistence_engine; uint32_t _bucket_bits; - MyStorageConfig _service_layer_config; + MyServiceLayerConfig _service_layer_config; + MyDistributorConfig _distributor_config; MyRpcClientConfig _rpc_client_config; ConfigSet _config_set; std::shared_ptr<IConfigContext> _config_context; std::unique_ptr<IBmFeedHandler> _feed_handler; std::unique_ptr<mbus::Slobrok> _slobrok; + std::shared_ptr<StorageApiChainBmFeedHandler::Context> _service_layer_chain_context; std::unique_ptr<MyServiceLayerProcess> _service_layer; std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; + std::unique_ptr<storage::DistributorProcess> _distributor; PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); @@ -487,7 +556,11 @@ struct PersistenceProviderFixture { std::unique_ptr<Document> make_document(uint32_t i) const; std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; void create_buckets(); - void start_service_layer(bool use_storage_chain); + void start_service_layer(const BMParams& params); + void start_distributor(const BMParams& params); + void create_feed_handler(const BMParams& params); + void shutdown_feed_handler(); + void shutdown_distributor(); void shutdown_service_layer(); }; @@ -502,8 +575,13 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _file_header_context(), _tls_listen_port(9017), _slobrok_port(9018), - _status_port(9019), - _rpc_client_port(9020), + _rpc_client_port(9019), + _service_layer_mbus_port(9020), + _service_layer_rpc_port(9021), + _service_layer_status_port(9022), + _distributor_mbus_port(9023), + _distributor_rpc_port(9024), + _distributor_status_port(9025), _tls("tls", _tls_listen_port, _base_dir, _file_header_context), _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), _query_limiter(), @@ -518,20 +596,24 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _write_filter(), _persistence_engine(), _bucket_bits(16), - _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()), + _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params.get_rpc_network_threads()), + _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params.get_rpc_network_threads()), _rpc_client_config("bm-rpc-client", _slobrok_port), _config_set(), _config_context(std::make_shared<ConfigContext>(_config_set)), _feed_handler(), _slobrok(), + _service_layer_chain_context(), _service_layer(), - _rpc_client_shared_rpc_resources() + _rpc_client_shared_rpc_resources(), + _distributor() { create_document_db(); _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db); _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); _service_layer_config.add_builders(_config_set); + _distributor_config.add_builders(_config_set); _rpc_client_config.add_builders(_config_set); _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine); } @@ -626,17 +708,16 @@ PersistenceProviderFixture::create_buckets() } void -PersistenceProviderFixture::start_service_layer(bool use_storage_chain) +PersistenceProviderFixture::start_service_layer(const BMParams& params) { LOG(info, "start slobrok"); _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); LOG(info, "start service layer"); config::ConfigUri config_uri("bm-servicelayer", _config_context); std::unique_ptr<storage::IStorageChainBuilder> chain_builder; - std::shared_ptr<StorageApiChainBmFeedHandler::Context> context; - if (use_storage_chain) { - context = StorageApiChainBmFeedHandler::get_context(); - chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context); + if (params.get_use_storage_chain()) { + _service_layer_chain_context = StorageApiChainBmFeedHandler::get_context(); + chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(_service_layer_chain_context); } _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, *_persistence_engine, @@ -648,17 +729,50 @@ PersistenceProviderFixture::start_service_layer(bool use_storage_chain) config::ConfigUri client_config_uri("bm-rpc-client", _config_context); _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100); _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); - if (use_storage_chain) { - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context)); - } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); +} + +void +PersistenceProviderFixture::start_distributor(const BMParams& params) +{ + if (params.get_enable_distributor()) { + config::ConfigUri config_uri("bm-distributor", _config_context); + _distributor = std::make_unique<storage::DistributorProcess>(config_uri); + _distributor->setupConfig(100ms); + _distributor->createNode(); } } void -PersistenceProviderFixture::shutdown_service_layer() +PersistenceProviderFixture::create_feed_handler(const BMParams& params) +{ + if (params.get_enable_service_layer()) { + if (params.get_use_storage_chain()) { + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); + } + } +} + +void +PersistenceProviderFixture::shutdown_feed_handler() { _feed_handler.reset(); +} + +void +PersistenceProviderFixture::shutdown_distributor() +{ + if (_distributor) { + LOG(info, "stop distributor"); + _distributor->getNode().requestShutdown("controlled shutdown"); + _distributor->shutdown(); + } +} + +void +PersistenceProviderFixture::shutdown_service_layer() +{ if (_rpc_client_shared_rpc_resources) { LOG(info, "stop rpc client shared resources"); _rpc_client_shared_rpc_resources->shutdown(); @@ -851,8 +965,12 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "create %u buckets", f.num_buckets()); f.create_buckets(); if (bm_params.get_enable_service_layer()) { - f.start_service_layer(bm_params.get_use_storage_chain()); + f.start_service_layer(bm_params); } + if (bm_params.get_enable_distributor()) { + f.start_distributor(bm_params); + } + f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put"); auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update"); @@ -867,6 +985,8 @@ void benchmark_async_spi(const BMParams &bm_params) for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) { run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params); } + f.shutdown_feed_handler(); + f.shutdown_distributor(); f.shutdown_service_layer(); } @@ -903,6 +1023,7 @@ App::usage() "[--update-passes update-passes]\n" "[--remove-passes remove-passes]\n" "[--rpc-network-threads threads]\n" + "[--enable-distributor]\n" "[--enable-service-layer]\n" "[--use-storage-chain]" << std::endl; } @@ -920,6 +1041,7 @@ App::get_options() { "update-passes", 1, nullptr, 0 }, { "remove-passes", 1, nullptr, 0 }, { "rpc-network-threads", 1, nullptr, 0 }, + { "enable-distributor", 0, nullptr, 0 }, { "enable-service-layer", 0, nullptr, 0 }, { "use-storage-chain", 0, nullptr, 0 } }; @@ -930,6 +1052,7 @@ App::get_options() LONGOPT_UPDATE_PASSES, LONGOPT_REMOVE_PASSES, LONGOPT_RPC_NETWORK_THREADS, + LONGOPT_ENABLE_DISTRIBUTOR, LONGOPT_ENABLE_SERVICE_LAYER, LONGOPT_USE_STORAGE_CHAIN }; @@ -957,6 +1080,9 @@ App::get_options() case LONGOPT_RPC_NETWORK_THREADS: _bm_params.set_rpc_network_threads(atoi(opt_argument)); break; + case LONGOPT_ENABLE_DISTRIBUTOR: + _bm_params.set_enable_distributor(true); + break; case LONGOPT_ENABLE_SERVICE_LAYER: _bm_params.set_enable_service_layer(true); break; |