summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-28 17:46:06 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-28 17:47:02 +0200
commit928c9b1bb606b2c51c2bcc704583f21a0392f222 (patch)
tree3cd6207722e93aad21d1f2aced4ecbccff6a5551 /searchcore
parent651eaba1126536893200fa777ed52c9446ac2c23 (diff)
Add distributor "process" to vespa-feed-bm.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp190
1 files changed, 158 insertions, 32 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 73d741c3fee..6fcb75308db 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -62,6 +62,9 @@
#include <vespa/messagebus/config-messagebus.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storageserver/app/distributorprocess.h>
+#include <vespa/storage/config/config-stor-distributormanager.h>
+#include <vespa/storage/config/config-stor-visitordispatcher.h>
#include <getopt.h>
#include <iostream>
@@ -87,9 +90,11 @@ using vespa::config::content::core::StorOpsloggerConfigBuilder;
using vespa::config::content::core::StorPrioritymappingConfigBuilder;
using vespa::config::content::LoadTypeConfigBuilder;
using vespa::config::content::UpgradingConfigBuilder;
+using vespa::config::content::core::StorDistributormanagerConfigBuilder;
using vespa::config::content::core::StorServerConfigBuilder;
using vespa::config::content::core::StorStatusConfigBuilder;
using vespa::config::content::core::StorVisitorConfigBuilder;
+using vespa::config::content::core::StorVisitordispatcherConfigBuilder;
using metrics::MetricsmanagerConfigBuilder;
using cloud::config::SlobroksConfigBuilder;
using messagebus::MessagebusConfigBuilder;
@@ -211,6 +216,7 @@ class BMParams {
uint32_t _update_passes;
uint32_t _remove_passes;
uint32_t _rpc_network_threads;
+ bool _enable_distributor;
bool _enable_service_layer;
bool _use_storage_chain;
uint32_t get_start(uint32_t thread_id) const {
@@ -224,6 +230,7 @@ public:
_update_passes(1),
_remove_passes(2),
_rpc_network_threads(1),
+ _enable_distributor(false),
_enable_service_layer(false),
_use_storage_chain(false)
{
@@ -237,6 +244,7 @@ public:
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
+ bool get_enable_distributor() const { return _enable_distributor; }
bool get_enable_service_layer() const { return _enable_service_layer; }
bool get_use_storage_chain() const { return _use_storage_chain; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
@@ -245,6 +253,7 @@ public:
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
+ void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; }
void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
bool check() const;
@@ -273,6 +282,10 @@ BMParams::check() const
std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl;
return false;
}
+ if (_enable_distributor && !_enable_service_layer) {
+ std::cerr << "Service layer must be enabled if distributor layer is enabled" << std::endl;
+ return false;
+ }
return true;
}
@@ -322,39 +335,31 @@ struct MyStorageConfig
{
vespalib::string config_id;
DocumenttypesConfigBuilder documenttypes;
- PersistenceConfigBuilder persistence;
StorDistributionConfigBuilder stor_distribution;
- StorFilestorConfigBuilder stor_filestor;
StorBouncerConfigBuilder stor_bouncer;
StorCommunicationmanagerConfigBuilder stor_communicationmanager;
- StorBucketInitConfigBuilder stor_bucket_init;
StorOpsloggerConfigBuilder stor_opslogger;
StorPrioritymappingConfigBuilder stor_prioritymapping;
UpgradingConfigBuilder upgrading;
StorServerConfigBuilder stor_server;
StorStatusConfigBuilder stor_status;
- StorVisitorConfigBuilder stor_visitor;
BucketspacesConfigBuilder bucketspaces;
LoadTypeConfigBuilder load_type;
MetricsmanagerConfigBuilder metricsmanager;
SlobroksConfigBuilder slobroks;
MessagebusConfigBuilder messagebus;
- MyStorageConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int status_port, uint32_t rpc_network_threads)
+ MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, int rpc_network_threads)
: config_id(config_id_in),
documenttypes(documenttypes_in),
- persistence(),
stor_distribution(),
- stor_filestor(),
stor_bouncer(),
stor_communicationmanager(),
- stor_bucket_init(),
stor_opslogger(),
stor_prioritymapping(),
upgrading(),
stor_server(),
stor_status(),
- stor_visitor(),
bucketspaces(),
load_type(),
metricsmanager(),
@@ -379,7 +384,12 @@ struct MyStorageConfig
dc.redundancy = 1;
dc.readyCopies = 1;
}
- stor_server.rootFolder = "storage";
+ stor_server.isDistributor = distributor;
+ if (distributor) {
+ stor_server.rootFolder = "distributor";
+ } else {
+ stor_server.rootFolder = "storage";
+ }
{
SlobroksConfigBuilder::Slobrok slobrok;
slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
@@ -387,6 +397,9 @@ struct MyStorageConfig
}
stor_communicationmanager.useDirectStorageapiRpc = true;
stor_communicationmanager.rpc.numNetworkThreads = rpc_network_threads;
+ stor_communicationmanager.mbusport = mbus_port;
+ stor_communicationmanager.rpcport = rpc_port;
+
stor_status.httpport = status_port;
}
@@ -394,18 +407,14 @@ struct MyStorageConfig
void add_builders(ConfigSet &set) {
set.addBuilder(config_id, &documenttypes);
- set.addBuilder(config_id, &persistence);
set.addBuilder(config_id, &stor_distribution);
- set.addBuilder(config_id, &stor_filestor);
set.addBuilder(config_id, &stor_bouncer);
set.addBuilder(config_id, &stor_communicationmanager);
- set.addBuilder(config_id, &stor_bucket_init);
set.addBuilder(config_id, &stor_opslogger);
set.addBuilder(config_id, &stor_prioritymapping);
set.addBuilder(config_id, &upgrading);
set.addBuilder(config_id, &stor_server);
set.addBuilder(config_id, &stor_status);
- set.addBuilder(config_id, &stor_visitor);
set.addBuilder(config_id, &bucketspaces);
set.addBuilder(config_id, &load_type);
set.addBuilder(config_id, &metricsmanager);
@@ -416,6 +425,58 @@ struct MyStorageConfig
MyStorageConfig::~MyStorageConfig() = default;
+struct MyServiceLayerConfig : public MyStorageConfig
+{
+ PersistenceConfigBuilder persistence;
+ StorFilestorConfigBuilder stor_filestor;
+ StorBucketInitConfigBuilder stor_bucket_init;
+ StorVisitorConfigBuilder stor_visitor;
+
+ MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, uint32_t rpc_network_threads)
+ : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, rpc_network_threads),
+ persistence(),
+ stor_filestor(),
+ stor_bucket_init(),
+ stor_visitor()
+ {
+ }
+
+ ~MyServiceLayerConfig();
+
+ void add_builders(ConfigSet &set) {
+ MyStorageConfig::add_builders(set);
+ set.addBuilder(config_id, &persistence);
+ set.addBuilder(config_id, &stor_filestor);
+ set.addBuilder(config_id, &stor_bucket_init);
+ set.addBuilder(config_id, &stor_visitor);
+ }
+};
+
+MyServiceLayerConfig::~MyServiceLayerConfig() = default;
+
+struct MyDistributorConfig : public MyStorageConfig
+{
+ StorDistributormanagerConfigBuilder stor_distributormanager;
+ StorVisitordispatcherConfigBuilder stor_visitordispatcher;
+
+ MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, int rpc_network_threads)
+ : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, rpc_network_threads),
+ stor_distributormanager(),
+ stor_visitordispatcher()
+ {
+ }
+
+ ~MyDistributorConfig();
+
+ void add_builders(ConfigSet &set) {
+ MyStorageConfig::add_builders(set);
+ set.addBuilder(config_id, &stor_distributormanager);
+ set.addBuilder(config_id, &stor_visitordispatcher);
+ }
+};
+
+MyDistributorConfig::~MyDistributorConfig() = default;
+
struct MyRpcClientConfig {
vespalib::string config_id;
SlobroksConfigBuilder slobroks;
@@ -452,8 +513,13 @@ struct PersistenceProviderFixture {
DummyFileHeaderContext _file_header_context;
int _tls_listen_port;
int _slobrok_port;
- int _status_port;
int _rpc_client_port;
+ int _service_layer_mbus_port;
+ int _service_layer_rpc_port;
+ int _service_layer_status_port;
+ int _distributor_mbus_port;
+ int _distributor_rpc_port;
+ int _distributor_status_port;
TransLogServer _tls;
vespalib::string _tls_spec;
matching::QueryLimiter _query_limiter;
@@ -468,14 +534,17 @@ struct PersistenceProviderFixture {
MyResourceWriteFilter _write_filter;
std::shared_ptr<PersistenceEngine> _persistence_engine;
uint32_t _bucket_bits;
- MyStorageConfig _service_layer_config;
+ MyServiceLayerConfig _service_layer_config;
+ MyDistributorConfig _distributor_config;
MyRpcClientConfig _rpc_client_config;
ConfigSet _config_set;
std::shared_ptr<IConfigContext> _config_context;
std::unique_ptr<IBmFeedHandler> _feed_handler;
std::unique_ptr<mbus::Slobrok> _slobrok;
+ std::shared_ptr<StorageApiChainBmFeedHandler::Context> _service_layer_chain_context;
std::unique_ptr<MyServiceLayerProcess> _service_layer;
std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
+ std::unique_ptr<storage::DistributorProcess> _distributor;
PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
@@ -487,7 +556,11 @@ struct PersistenceProviderFixture {
std::unique_ptr<Document> make_document(uint32_t i) const;
std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const;
void create_buckets();
- void start_service_layer(bool use_storage_chain);
+ void start_service_layer(const BMParams& params);
+ void start_distributor(const BMParams& params);
+ void create_feed_handler(const BMParams& params);
+ void shutdown_feed_handler();
+ void shutdown_distributor();
void shutdown_service_layer();
};
@@ -502,8 +575,13 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_file_header_context(),
_tls_listen_port(9017),
_slobrok_port(9018),
- _status_port(9019),
- _rpc_client_port(9020),
+ _rpc_client_port(9019),
+ _service_layer_mbus_port(9020),
+ _service_layer_rpc_port(9021),
+ _service_layer_status_port(9022),
+ _distributor_mbus_port(9023),
+ _distributor_rpc_port(9024),
+ _distributor_status_port(9025),
_tls("tls", _tls_listen_port, _base_dir, _file_header_context),
_tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)),
_query_limiter(),
@@ -518,20 +596,24 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_write_filter(),
_persistence_engine(),
_bucket_bits(16),
- _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()),
+ _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params.get_rpc_network_threads()),
+ _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params.get_rpc_network_threads()),
_rpc_client_config("bm-rpc-client", _slobrok_port),
_config_set(),
_config_context(std::make_shared<ConfigContext>(_config_set)),
_feed_handler(),
_slobrok(),
+ _service_layer_chain_context(),
_service_layer(),
- _rpc_client_shared_rpc_resources()
+ _rpc_client_shared_rpc_resources(),
+ _distributor()
{
create_document_db();
_persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db);
_persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
_service_layer_config.add_builders(_config_set);
+ _distributor_config.add_builders(_config_set);
_rpc_client_config.add_builders(_config_set);
_feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine);
}
@@ -626,17 +708,16 @@ PersistenceProviderFixture::create_buckets()
}
void
-PersistenceProviderFixture::start_service_layer(bool use_storage_chain)
+PersistenceProviderFixture::start_service_layer(const BMParams& params)
{
LOG(info, "start slobrok");
_slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
LOG(info, "start service layer");
config::ConfigUri config_uri("bm-servicelayer", _config_context);
std::unique_ptr<storage::IStorageChainBuilder> chain_builder;
- std::shared_ptr<StorageApiChainBmFeedHandler::Context> context;
- if (use_storage_chain) {
- context = StorageApiChainBmFeedHandler::get_context();
- chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context);
+ if (params.get_use_storage_chain()) {
+ _service_layer_chain_context = StorageApiChainBmFeedHandler::get_context();
+ chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(_service_layer_chain_context);
}
_service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
*_persistence_engine,
@@ -648,17 +729,50 @@ PersistenceProviderFixture::start_service_layer(bool use_storage_chain)
config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
_rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
_rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
- if (use_storage_chain) {
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context));
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
+}
+
+void
+PersistenceProviderFixture::start_distributor(const BMParams& params)
+{
+ if (params.get_enable_distributor()) {
+ config::ConfigUri config_uri("bm-distributor", _config_context);
+ _distributor = std::make_unique<storage::DistributorProcess>(config_uri);
+ _distributor->setupConfig(100ms);
+ _distributor->createNode();
}
}
void
-PersistenceProviderFixture::shutdown_service_layer()
+PersistenceProviderFixture::create_feed_handler(const BMParams& params)
+{
+ if (params.get_enable_service_layer()) {
+ if (params.get_use_storage_chain()) {
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context);
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo);
+ }
+ }
+}
+
+void
+PersistenceProviderFixture::shutdown_feed_handler()
{
_feed_handler.reset();
+}
+
+void
+PersistenceProviderFixture::shutdown_distributor()
+{
+ if (_distributor) {
+ LOG(info, "stop distributor");
+ _distributor->getNode().requestShutdown("controlled shutdown");
+ _distributor->shutdown();
+ }
+}
+
+void
+PersistenceProviderFixture::shutdown_service_layer()
+{
if (_rpc_client_shared_rpc_resources) {
LOG(info, "stop rpc client shared resources");
_rpc_client_shared_rpc_resources->shutdown();
@@ -851,8 +965,12 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "create %u buckets", f.num_buckets());
f.create_buckets();
if (bm_params.get_enable_service_layer()) {
- f.start_service_layer(bm_params.get_use_storage_chain());
+ f.start_service_layer(bm_params);
}
+ if (bm_params.get_enable_distributor()) {
+ f.start_distributor(bm_params);
+ }
+ f.create_feed_handler(bm_params);
vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024);
auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put");
auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update");
@@ -867,6 +985,8 @@ void benchmark_async_spi(const BMParams &bm_params)
for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) {
run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params);
}
+ f.shutdown_feed_handler();
+ f.shutdown_distributor();
f.shutdown_service_layer();
}
@@ -903,6 +1023,7 @@ App::usage()
"[--update-passes update-passes]\n"
"[--remove-passes remove-passes]\n"
"[--rpc-network-threads threads]\n"
+ "[--enable-distributor]\n"
"[--enable-service-layer]\n"
"[--use-storage-chain]" << std::endl;
}
@@ -920,6 +1041,7 @@ App::get_options()
{ "update-passes", 1, nullptr, 0 },
{ "remove-passes", 1, nullptr, 0 },
{ "rpc-network-threads", 1, nullptr, 0 },
+ { "enable-distributor", 0, nullptr, 0 },
{ "enable-service-layer", 0, nullptr, 0 },
{ "use-storage-chain", 0, nullptr, 0 }
};
@@ -930,6 +1052,7 @@ App::get_options()
LONGOPT_UPDATE_PASSES,
LONGOPT_REMOVE_PASSES,
LONGOPT_RPC_NETWORK_THREADS,
+ LONGOPT_ENABLE_DISTRIBUTOR,
LONGOPT_ENABLE_SERVICE_LAYER,
LONGOPT_USE_STORAGE_CHAIN
};
@@ -957,6 +1080,9 @@ App::get_options()
case LONGOPT_RPC_NETWORK_THREADS:
_bm_params.set_rpc_network_threads(atoi(opt_argument));
break;
+ case LONGOPT_ENABLE_DISTRIBUTOR:
+ _bm_params.set_enable_distributor(true);
+ break;
case LONGOPT_ENABLE_SERVICE_LAYER:
_bm_params.set_enable_service_layer(true);
break;