diff options
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/CMakeLists.txt | 1 | ||||
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/CMakeLists.txt | 34 | ||||
-rw-r--r-- | searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp | 1073 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt | 42 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp | 180 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h | 55 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp | 47 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h | 67 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp | 195 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_feed.h | 57 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_message_bus.h) | 4 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp | 675 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_node.h | 56 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_range.h | 24 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_link.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bm_storage_link_context.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h (renamed from searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h | 28 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/i_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.cpp (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.h (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp) | 2 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h) | 2 |
42 files changed, 1556 insertions, 1036 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index 2d5eb8dbc4f..c76f35bd9ff 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -20,6 +20,7 @@ vespa_define_module( fileacquirer LIBS + src/vespa/searchcore/bmcluster src/vespa/searchcore/config src/vespa/searchcore/grouping src/vespa/searchcore/proton/attribute diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index fe83c89d83a..daefef5d413 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -2,39 +2,7 @@ vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES vespa_feed_bm.cpp - bm_cluster_controller.cpp - bm_message_bus.cpp - bm_storage_chain_builder.cpp - bm_storage_link.cpp - bucket_info_queue.cpp - document_api_message_bus_bm_feed_handler.cpp - pending_tracker.cpp - pending_tracker_hash.cpp - spi_bm_feed_handler.cpp - storage_api_chain_bm_feed_handler.cpp - storage_api_message_bus_bm_feed_handler.cpp - storage_api_rpc_bm_feed_handler.cpp - storage_reply_error_checker.cpp OUTPUT_NAME vespa-feed-bm DEPENDS - searchcore_server - searchcore_initializer - searchcore_reprocessing - searchcore_index - searchcore_persistenceengine - searchcore_docsummary - searchcore_feedoperation - searchcore_matching - searchcore_attribute - searchcore_documentmetastore - searchcore_bucketdb - searchcore_flushengine - searchcore_pcommon - searchcore_grouping - searchcore_proton_metrics - searchcore_fconfig - storageserver_storageapp - messagebus_messagebus-test - messagebus - searchlib_searchlib_uca + searchcore_bmcluster ) diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index cd1920d237f..9752a4b5c36 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -1,80 +1,25 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bm_cluster_controller.h" -#include "bm_message_bus.h" -#include "bm_storage_chain_builder.h" -#include "bm_storage_link_context.h" -#include "pending_tracker.h" -#include "spi_bm_feed_handler.h" -#include "storage_api_chain_bm_feed_handler.h" -#include "storage_api_message_bus_bm_feed_handler.h" -#include "storage_api_rpc_bm_feed_handler.h" -#include "document_api_message_bus_bm_feed_handler.h" -#include <tests/proton/common/dummydbowner.h> -#include <vespa/config-attributes.h> -#include <vespa/config-bucketspaces.h> -#include <vespa/config-imported-fields.h> -#include <vespa/config-indexschema.h> -#include <vespa/config-persistence.h> -#include <vespa/config-rank-profiles.h> -#include <vespa/config-slobroks.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config-stor-filestor.h> -#include <vespa/config-summary.h> -#include <vespa/config-summarymap.h> -#include <vespa/config-upgrading.h> -#include <vespa/config/common/configcontext.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/fieldset/fieldsetrepo.h> -#include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/repo/configbuilder.h> #include <vespa/document/repo/document_type_repo_factory.h> #include <vespa/document/repo/documenttyperepo.h> -#include <vespa/document/test/make_bucket_space.h> -#include <vespa/document/update/assignvalueupdate.h> -#include <vespa/document/update/documentupdate.h> #include <vespa/fastos/app.h> -#include <vespa/messagebus/config-messagebus.h> -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/metrics/config-metricsmanager.h> -#include <vespa/searchcommon/common/schemaconfigurer.h> -#include <vespa/searchcore/proton/common/alloc_config.h> -#include <vespa/searchcore/proton/common/hw_info.h> -#include <vespa/searchcore/proton/matching/querylimiter.h> -#include <vespa/searchcore/proton/metrics/metricswireservice.h> -#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> -#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> -#include <vespa/searchcore/proton/server/bootstrapconfig.h> -#include <vespa/searchcore/proton/server/document_db_maintenance_config.h> -#include <vespa/searchcore/proton/server/documentdb.h> -#include <vespa/searchcore/proton/server/documentdbconfigmanager.h> -#include <vespa/searchcore/proton/server/fileconfigmanager.h> -#include <vespa/searchcore/proton/server/memoryconfigstore.h> -#include <vespa/searchcore/proton/server/persistencehandlerproxy.h> -#include <vespa/searchcore/proton/server/threading_service_config.h> -#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> +#include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/searchcore/bmcluster/bm_cluster.h> +#include <vespa/searchcore/bmcluster/bm_cluster_controller.h> +#include <vespa/searchcore/bmcluster/bm_cluster_params.h> +#include <vespa/searchcore/bmcluster/bm_feed.h> +#include <vespa/searchcore/bmcluster/bm_node.h> +#include <vespa/searchcore/bmcluster/bm_range.h> +#include <vespa/searchcore/bmcluster/bucket_selector.h> +#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> -#include <vespa/searchlib/transactionlog/translogserver.h> -#include <vespa/searchsummary/config/config-juniperrc.h> -#include <vespa/slobrok/sbmirror.h> -#include <vespa/storage/bucketdb/config-stor-bucket-init.h> -#include <vespa/storage/common/i_storage_chain_builder.h> -#include <vespa/storage/config/config-stor-bouncer.h> -#include <vespa/storage/config/config-stor-communicationmanager.h> -#include <vespa/storage/config/config-stor-distributormanager.h> -#include <vespa/storage/config/config-stor-opslogger.h> -#include <vespa/storage/config/config-stor-prioritymapping.h> -#include <vespa/storage/config/config-stor-server.h> -#include <vespa/storage/config/config-stor-status.h> -#include <vespa/storage/config/config-stor-visitordispatcher.h> -#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> -#include <vespa/storage/visiting/config-stor-visitor.h> -#include <vespa/storageserver/app/distributorprocess.h> -#include <vespa/storageserver/app/servicelayerprocess.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <getopt.h> #include <iostream> #include <thread> @@ -82,82 +27,30 @@ #include <vespa/log/log.h> LOG_SETUP("vespa-feed-bm"); -using namespace cloud::config::filedistribution; -using namespace config; using namespace proton; using namespace std::chrono_literals; -using namespace vespa::config::search::core; -using namespace vespa::config::search::summary; -using namespace vespa::config::search; -using vespa::config::content::PersistenceConfigBuilder; -using vespa::config::content::StorDistributionConfigBuilder; -using vespa::config::content::StorFilestorConfigBuilder; -using vespa::config::content::UpgradingConfigBuilder; -using vespa::config::content::core::BucketspacesConfig; -using vespa::config::content::core::BucketspacesConfigBuilder; -using vespa::config::content::core::StorBouncerConfigBuilder; -using vespa::config::content::core::StorBucketInitConfigBuilder; -using vespa::config::content::core::StorCommunicationmanagerConfigBuilder; -using vespa::config::content::core::StorDistributormanagerConfigBuilder; -using vespa::config::content::core::StorOpsloggerConfigBuilder; -using vespa::config::content::core::StorPrioritymappingConfigBuilder; -using vespa::config::content::core::StorServerConfigBuilder; -using vespa::config::content::core::StorStatusConfigBuilder; -using vespa::config::content::core::StorVisitorConfigBuilder; -using vespa::config::content::core::StorVisitordispatcherConfigBuilder; -using cloud::config::SlobroksConfigBuilder; -using messagebus::MessagebusConfigBuilder; -using metrics::MetricsmanagerConfigBuilder; -using config::ConfigContext; -using config::ConfigSet; -using config::ConfigUri; -using config::IConfigContext; -using document::AssignValueUpdate; -using document::BucketId; -using document::BucketSpace; -using document::Document; -using document::DocumentId; -using document::DocumentType; using document::DocumentTypeRepo; using document::DocumentTypeRepoFactory; -using document::DocumentUpdate; using document::DocumenttypesConfig; using document::DocumenttypesConfigBuilder; -using document::Field; -using document::FieldSetRepo; -using document::FieldUpdate; -using document::IntFieldValue; -using document::test::makeBucketSpace; -using feedbm::BmClusterController; -using feedbm::BmMessageBus; -using feedbm::BmStorageChainBuilder; -using feedbm::BmStorageLinkContext; -using feedbm::IBmFeedHandler; -using feedbm::DocumentApiMessageBusBmFeedHandler; -using feedbm::SpiBmFeedHandler; -using feedbm::StorageApiChainBmFeedHandler; -using feedbm::StorageApiMessageBusBmFeedHandler; -using feedbm::StorageApiRpcBmFeedHandler; -using search::TuneFileDocumentDB; +using search::bmcluster::BmClusterController; +using search::bmcluster::IBmFeedHandler; +using search::bmcluster::BmClusterParams; +using search::bmcluster::BmCluster; +using search::bmcluster::BmFeed; +using search::bmcluster::BmNode; +using search::bmcluster::BmRange; +using search::bmcluster::BucketSelector; using search::index::DummyFileHeaderContext; -using search::index::Schema; -using search::index::SchemaBuilder; -using search::transactionlog::TransLogServer; -using storage::rpc::SharedRpcResources; -using storage::rpc::StorageApiRpcService; using storage::spi::PersistenceProvider; -using vespalib::compression::CompressionConfig; using vespalib::makeLambdaTask; -using proton::ThreadingServiceConfig; - -using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>; namespace { vespalib::string base_dir = "testdb"; -std::shared_ptr<DocumenttypesConfig> make_document_type() { +std::shared_ptr<DocumenttypesConfig> make_document_types() { using Struct = document::config_builder::Struct; using DataType = document::DataType; document::config_builder::DocumenttypesConfigBuilderHelper builder; @@ -165,130 +58,14 @@ std::shared_ptr<DocumenttypesConfig> make_document_type() { return std::make_shared<DocumenttypesConfig>(builder.config()); } -std::shared_ptr<AttributesConfig> make_attributes_config() { - AttributesConfigBuilder builder; - AttributesConfig::Attribute attribute; - attribute.name = "int"; - attribute.datatype = AttributesConfig::Attribute::Datatype::INT32; - builder.attribute.emplace_back(attribute); - return std::make_shared<AttributesConfig>(builder); -} - -std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const DocumentTypeRepo> repo, const DocTypeName& doc_type_name) -{ - auto indexschema = std::make_shared<IndexschemaConfig>(); - auto attributes = make_attributes_config(); - auto summary = std::make_shared<SummaryConfig>(); - std::shared_ptr<Schema> schema(new Schema()); - SchemaBuilder::build(*indexschema, *schema); - SchemaBuilder::build(*attributes, *schema); - SchemaBuilder::build(*summary, *schema); - return std::make_shared<DocumentDBConfig>( - 1, - std::make_shared<RankProfilesConfig>(), - std::make_shared<matching::RankingConstants>(), - std::make_shared<matching::RankingExpressions>(), - std::make_shared<matching::OnnxModels>(), - indexschema, - attributes, - summary, - std::make_shared<SummarymapConfig>(), - std::make_shared<JuniperrcConfig>(), - document_types, - repo, - std::make_shared<ImportedFieldsConfig>(), - std::make_shared<TuneFileDocumentDB>(), - schema, - std::make_shared<DocumentDBMaintenanceConfig>(), - search::LogDocumentStore::Config(), - std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)), - std::make_shared<const AllocConfig>(), - "client", - doc_type_name.getName()); -} - -void -make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) -{ - SlobroksConfigBuilder::Slobrok slobrok; - slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); - slobroks.slobrok.push_back(std::move(slobrok)); -} - -void -make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces) -{ - BucketspacesConfigBuilder::Documenttype bucket_space_map; - bucket_space_map.name = "test"; - bucket_space_map.bucketspace = "default"; - bucketspaces.documenttype.emplace_back(std::move(bucket_space_map)); -} - -class MyPersistenceEngineOwner : public IPersistenceEngineOwner -{ - void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { } -}; - -struct MyResourceWriteFilter : public IResourceWriteFilter -{ - bool acceptWriteOperation() const override { return true; } - State getAcceptState() const override { return IResourceWriteFilter::State(); } -}; - -class BucketSelector -{ - uint32_t _thread_id; - uint32_t _threads; - uint32_t _num_buckets; -public: - BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in) - : _thread_id(thread_id_in), - _threads(threads_in), - _num_buckets((num_buckets_in / _threads) * _threads) - { - } - uint64_t operator()(uint32_t i) const { - return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; - } -}; - -class BMRange -{ - uint32_t _start; - uint32_t _end; -public: - BMRange(uint32_t start_in, uint32_t end_in) - : _start(start_in), - _end(end_in) - { - } - uint32_t get_start() const { return _start; } - uint32_t get_end() const { return _end; } -}; - -class BMParams { +class BMParams : public BmClusterParams { uint32_t _documents; uint32_t _client_threads; uint32_t _get_passes; - vespalib::string _indexing_sequencer; uint32_t _put_passes; uint32_t _update_passes; uint32_t _remove_passes; - uint32_t _rpc_network_threads; - uint32_t _rpc_events_before_wakeup; - uint32_t _rpc_targets_per_node; - uint32_t _response_threads; uint32_t _max_pending; - bool _enable_distributor; - bool _enable_service_layer; - bool _skip_get_spi_bucket_info; - bool _use_document_api; - bool _use_message_bus; - bool _use_storage_chain; - bool _use_async_message_handling_on_schedule; - uint32_t _bucket_db_stripe_bits; - uint32_t _distributor_stripes; - bool _skip_communicationmanager_thread; uint32_t get_start(uint32_t thread_id) const { return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads); } @@ -297,82 +74,39 @@ public: : _documents(160000), _client_threads(1), _get_passes(0), - _indexing_sequencer(), _put_passes(2), _update_passes(1), _remove_passes(2), - _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def - _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def - _rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def - _response_threads(2), // Same default as in stor-filestor.def - _max_pending(1000), - _enable_distributor(false), - _enable_service_layer(false), - _skip_get_spi_bucket_info(false), - _use_document_api(false), - _use_message_bus(false), - _use_storage_chain(false), - _use_async_message_handling_on_schedule(false), - _bucket_db_stripe_bits(0), - _distributor_stripes(0), - _skip_communicationmanager_thread(false) // Same default as in stor-communicationmanager.def + _max_pending(1000) { } - BMRange get_range(uint32_t thread_id) const { - return BMRange(get_start(thread_id), get_start(thread_id + 1)); + BmRange get_range(uint32_t thread_id) const { + return BmRange(get_start(thread_id), get_start(thread_id + 1)); } uint32_t get_documents() const { return _documents; } uint32_t get_max_pending() const { return _max_pending; } uint32_t get_client_threads() const { return _client_threads; } uint32_t get_get_passes() const { return _get_passes; } - const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } uint32_t get_put_passes() const { return _put_passes; } uint32_t get_update_passes() const { return _update_passes; } uint32_t get_remove_passes() const { return _remove_passes; } - uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } - uint32_t get_rpc_events_before_wakup() const { return _rpc_events_before_wakeup; } - uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; } - uint32_t get_response_threads() const { return _response_threads; } - bool get_enable_distributor() const { return _enable_distributor; } - bool get_skip_get_spi_bucket_info() const { return _skip_get_spi_bucket_info; } - bool get_use_document_api() const { return _use_document_api; } - bool get_use_message_bus() const { return _use_message_bus; } - bool get_use_storage_chain() const { return _use_storage_chain; } - bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; } - uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } - uint32_t get_distributor_stripes() const { return _distributor_stripes; } - bool get_skip_communicationmanager_thread() const { return _skip_communicationmanager_thread; } void set_documents(uint32_t documents_in) { _documents = documents_in; } void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; } void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; } void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; } - void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; } void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } - void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } - void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; } - void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; } - void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } - void set_enable_distributor(bool value) { _enable_distributor = value; } - void set_enable_service_layer(bool value) { _enable_service_layer = value; } - void set_skip_get_spi_bucket_info(bool value) { _skip_get_spi_bucket_info = value; } - void set_use_document_api(bool value) { _use_document_api = value; } - void set_use_message_bus(bool value) { _use_message_bus = value; } - void set_use_storage_chain(bool value) { _use_storage_chain = value; } - void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; } - void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } - void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; } - void set_skip_communicationmanager_thread(bool value) { _skip_communicationmanager_thread = value; } bool check() const; - bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } - bool needs_distributor() const { return _enable_distributor || _use_document_api; } - bool needs_message_bus() const { return _use_message_bus || _use_document_api; } + bool needs_message_bus() const { return get_use_message_bus() || get_use_document_api(); } }; bool BMParams::check() const { + if (!BmClusterParams::check()) { + return false; + } if (_client_threads < 1) { std::cerr << "Too few client threads: " << _client_threads << std::endl; return false; @@ -389,533 +123,82 @@ BMParams::check() const std::cerr << "Put passes too low: " << _put_passes << std::endl; return false; } - if (_rpc_network_threads < 1) { - std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; - return false; - } - if (_rpc_targets_per_node < 1) { - std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl; - return false; - } - if (_response_threads < 1) { - std::cerr << "Too few response threads: " << _response_threads << std::endl; - return false; - } return true; } -class MyServiceLayerProcess : public storage::ServiceLayerProcess { - PersistenceProvider& _provider; - -public: - MyServiceLayerProcess(const config::ConfigUri & configUri, - PersistenceProvider &provider, - std::unique_ptr<storage::IStorageChainBuilder> chain_builder); - ~MyServiceLayerProcess() override { shutdown(); } - - void shutdown() override; - void setupProvider() override; - PersistenceProvider& getProvider() override; -}; - -MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri, - PersistenceProvider &provider, - std::unique_ptr<storage::IStorageChainBuilder> chain_builder) - : ServiceLayerProcess(configUri), - _provider(provider) -{ - if (chain_builder) { - set_storage_chain_builder(std::move(chain_builder)); - } -} - -void -MyServiceLayerProcess::shutdown() -{ - ServiceLayerProcess::shutdown(); -} - -void -MyServiceLayerProcess::setupProvider() -{ -} - -PersistenceProvider& -MyServiceLayerProcess::getProvider() -{ - return _provider; -} - -struct MyStorageConfig -{ - vespalib::string config_id; - DocumenttypesConfigBuilder documenttypes; - StorDistributionConfigBuilder stor_distribution; - StorBouncerConfigBuilder stor_bouncer; - StorCommunicationmanagerConfigBuilder stor_communicationmanager; - StorOpsloggerConfigBuilder stor_opslogger; - StorPrioritymappingConfigBuilder stor_prioritymapping; - UpgradingConfigBuilder upgrading; - StorServerConfigBuilder stor_server; - StorStatusConfigBuilder stor_status; - BucketspacesConfigBuilder bucketspaces; - MetricsmanagerConfigBuilder metricsmanager; - SlobroksConfigBuilder slobroks; - MessagebusConfigBuilder messagebus; - - MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, - int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params) - : config_id(config_id_in), - documenttypes(documenttypes_in), - stor_distribution(), - stor_bouncer(), - stor_communicationmanager(), - stor_opslogger(), - stor_prioritymapping(), - upgrading(), - stor_server(), - stor_status(), - bucketspaces(), - metricsmanager(), - slobroks(), - messagebus() - { - { - auto &dc = stor_distribution; - { - StorDistributionConfigBuilder::Group group; - { - StorDistributionConfigBuilder::Group::Nodes node; - node.index = 0; - group.nodes.push_back(std::move(node)); - } - group.index = "invalid"; - group.name = "invalid"; - group.capacity = 1.0; - group.partitions = ""; - dc.group.push_back(std::move(group)); - } - dc.redundancy = 1; - dc.readyCopies = 1; - } - stor_server.isDistributor = distributor; - stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits(); - if (distributor) { - stor_server.rootFolder = "distributor"; - } else { - stor_server.rootFolder = "storage"; - } - make_slobroks_config(slobroks, slobrok_port); - stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads(); - stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakup(); - stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node(); - stor_communicationmanager.mbusport = mbus_port; - stor_communicationmanager.rpcport = rpc_port; - stor_communicationmanager.skipThread = params.get_skip_communicationmanager_thread(); - - stor_status.httpport = status_port; - make_bucketspaces_config(bucketspaces); - } - - ~MyStorageConfig(); - - void add_builders(ConfigSet &set) { - set.addBuilder(config_id, &documenttypes); - set.addBuilder(config_id, &stor_distribution); - set.addBuilder(config_id, &stor_bouncer); - set.addBuilder(config_id, &stor_communicationmanager); - set.addBuilder(config_id, &stor_opslogger); - set.addBuilder(config_id, &stor_prioritymapping); - set.addBuilder(config_id, &upgrading); - set.addBuilder(config_id, &stor_server); - set.addBuilder(config_id, &stor_status); - set.addBuilder(config_id, &bucketspaces); - set.addBuilder(config_id, &metricsmanager); - set.addBuilder(config_id, &slobroks); - set.addBuilder(config_id, &messagebus); - } -}; - -MyStorageConfig::~MyStorageConfig() = default; - -struct MyServiceLayerConfig : public MyStorageConfig -{ - PersistenceConfigBuilder persistence; - StorFilestorConfigBuilder stor_filestor; - StorBucketInitConfigBuilder stor_bucket_init; - StorVisitorConfigBuilder stor_visitor; - - MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, - int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params) - : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), - persistence(), - stor_filestor(), - stor_bucket_init(), - stor_visitor() - { - stor_filestor.numResponseThreads = params.get_response_threads(); - stor_filestor.numNetworkThreads = params.get_rpc_network_threads(); - stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule(); - } - - ~MyServiceLayerConfig(); - - void add_builders(ConfigSet &set) { - MyStorageConfig::add_builders(set); - set.addBuilder(config_id, &persistence); - set.addBuilder(config_id, &stor_filestor); - set.addBuilder(config_id, &stor_bucket_init); - set.addBuilder(config_id, &stor_visitor); - } -}; - -MyServiceLayerConfig::~MyServiceLayerConfig() = default; - -struct MyDistributorConfig : public MyStorageConfig -{ - StorDistributormanagerConfigBuilder stor_distributormanager; - StorVisitordispatcherConfigBuilder stor_visitordispatcher; - - MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, - int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params) - : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), - stor_distributormanager(), - stor_visitordispatcher() - { - stor_distributormanager.numDistributorStripes = params.get_distributor_stripes(); - } - - ~MyDistributorConfig(); - - void add_builders(ConfigSet &set) { - MyStorageConfig::add_builders(set); - set.addBuilder(config_id, &stor_distributormanager); - set.addBuilder(config_id, &stor_visitordispatcher); - } -}; - -MyDistributorConfig::~MyDistributorConfig() = default; - -struct MyRpcClientConfig { - vespalib::string config_id; - SlobroksConfigBuilder slobroks; - - MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port) - : config_id(config_id_in), - slobroks() - { - make_slobroks_config(slobroks, slobrok_port); - } - ~MyRpcClientConfig(); - - void add_builders(ConfigSet &set) { - set.addBuilder(config_id, &slobroks); - } -}; - -MyRpcClientConfig::~MyRpcClientConfig() = default; - -struct MyMessageBusConfig { - vespalib::string config_id; - SlobroksConfigBuilder slobroks; - MessagebusConfigBuilder messagebus; - - MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port) - : config_id(config_id_in), - slobroks(), - messagebus() - { - make_slobroks_config(slobroks, slobrok_port); - } - ~MyMessageBusConfig(); - - void add_builders(ConfigSet &set) { - set.addBuilder(config_id, &slobroks); - set.addBuilder(config_id, &messagebus); - } -}; - -MyMessageBusConfig::~MyMessageBusConfig() = default; - } struct PersistenceProviderFixture { - std::shared_ptr<DocumenttypesConfig> _document_types; + std::shared_ptr<const DocumenttypesConfig> _document_types; std::shared_ptr<const DocumentTypeRepo> _repo; - DocTypeName _doc_type_name; - const DocumentType* _document_type; - const Field& _field; - std::shared_ptr<DocumentDBConfig> _document_db_config; - vespalib::string _base_dir; - DummyFileHeaderContext _file_header_context; - int _tls_listen_port; - int _slobrok_port; - int _rpc_client_port; - int _service_layer_mbus_port; - int _service_layer_rpc_port; - int _service_layer_status_port; - int _distributor_mbus_port; - int _distributor_rpc_port; - int _distributor_status_port; - TransLogServer _tls; - vespalib::string _tls_spec; - matching::QueryLimiter _query_limiter; - vespalib::Clock _clock; - DummyWireService _metrics_wire_service; - MemoryConfigStores _config_stores; - vespalib::ThreadStackExecutor _summary_executor; - DummyDBOwner _document_db_owner; - BucketSpace _bucket_space; - std::shared_ptr<DocumentDB> _document_db; - MyPersistenceEngineOwner _persistence_owner; - MyResourceWriteFilter _write_filter; - test::DiskMemUsageNotifier _disk_mem_usage_notifier; - std::shared_ptr<PersistenceEngine> _persistence_engine; - std::unique_ptr<const FieldSetRepo> _field_set_repo; - uint32_t _bucket_bits; - MyServiceLayerConfig _service_layer_config; - MyDistributorConfig _distributor_config; - MyRpcClientConfig _rpc_client_config; - MyMessageBusConfig _message_bus_config; - ConfigSet _config_set; - std::shared_ptr<IConfigContext> _config_context; - std::unique_ptr<IBmFeedHandler> _feed_handler; - std::unique_ptr<mbus::Slobrok> _slobrok; - std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context; - std::unique_ptr<MyServiceLayerProcess> _service_layer; - std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; - std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; - std::unique_ptr<storage::DistributorProcess> _distributor; - std::unique_ptr<BmMessageBus> _message_bus; + std::unique_ptr<BmCluster> _bm_cluster; + std::unique_ptr<BmNode> _bm_node; + BmFeed _feed; + IBmFeedHandler* _feed_handler; explicit PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); - void create_document_db(const BMParams & params); - uint32_t num_buckets() const { return (1u << _bucket_bits); } - BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); } - document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); } - DocumentId make_document_id(uint32_t n, uint32_t i) const; - std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const; - std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const; void create_buckets(); void wait_slobrok(const vespalib::string &name); - void start_service_layer(const BMParams& params); - void start_distributor(const BMParams& params); + void start_service_layer(const BmClusterParams& params); + void start_distributor(const BmClusterParams& params); void start_message_bus(); - void create_feed_handler(const BMParams& params); + void create_feed_handler(const BmClusterParams& params); void shutdown_feed_handler(); void shutdown_message_bus(); void shutdown_distributor(); void shutdown_service_layer(); + PersistenceProvider* get_persistence_provider() { return _bm_node->get_persistence_provider(); } }; PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) - : _document_types(make_document_type()), - _repo(DocumentTypeRepoFactory::make(*_document_types)), - _doc_type_name("test"), - _document_type(_repo->getDocumentType(_doc_type_name.getName())), - _field(_document_type->getField("int")), - _document_db_config(make_document_db_config(_document_types, _repo, _doc_type_name)), - _base_dir(base_dir), - _file_header_context(), - _tls_listen_port(9017), - _slobrok_port(9018), - _rpc_client_port(9019), - _service_layer_mbus_port(9020), - _service_layer_rpc_port(9021), - _service_layer_status_port(9022), - _distributor_mbus_port(9023), - _distributor_rpc_port(9024), - _distributor_status_port(9025), - _tls("tls", _tls_listen_port, _base_dir, _file_header_context), - _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), - _query_limiter(), - _clock(), - _metrics_wire_service(), - _config_stores(), - _summary_executor(8, 128_Ki), - _document_db_owner(), - _bucket_space(makeBucketSpace(_doc_type_name.getName())), - _document_db(), - _persistence_owner(), - _write_filter(), - _disk_mem_usage_notifier(), - _persistence_engine(), - _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)), - _bucket_bits(16), - _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), - _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), - _rpc_client_config("bm-rpc-client", _slobrok_port), - _message_bus_config("bm-message-bus", _slobrok_port), - _config_set(), - _config_context(std::make_shared<ConfigContext>(_config_set)), - _feed_handler(), - _slobrok(), - _service_layer_chain_context(), - _service_layer(), - _rpc_client_shared_rpc_resources(), - _distributor_chain_context(), - _distributor(), - _message_bus() -{ - create_document_db(params); - _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false); - auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db); - _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); - _service_layer_config.add_builders(_config_set); - _distributor_config.add_builders(_config_set); - _rpc_client_config.add_builders(_config_set); - _message_bus_config.add_builders(_config_set); - _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info()); -} - -PersistenceProviderFixture::~PersistenceProviderFixture() -{ - if (_persistence_engine) { - _persistence_engine->destroyIterators(); - _persistence_engine->removeHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name); - } - if (_document_db) { - _document_db->close(); - } -} - -void -PersistenceProviderFixture::create_document_db(const BMParams & params) -{ - vespalib::mkdir(_base_dir, false); - vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false); - vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig"; - { - FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName()); - fileCfg.saveConfig(*_document_db_config, 1); - } - config::DirSpec spec(input_cfg + "/config-1"); - auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>(); - DocumentDBConfigHelper mgr(spec, _doc_type_name.getName()); - auto protonCfg = std::make_shared<ProtonConfigBuilder>(); - if ( ! params.get_indexing_sequencer().empty()) { - vespalib::string sequencer = params.get_indexing_sequencer(); - std::transform(sequencer.begin(), sequencer.end(), sequencer.begin(), [](unsigned char c){ return std::toupper(c); }); - protonCfg->indexing.optimize = ProtonConfig::Indexing::getOptimize(sequencer); - } - auto bootstrap_config = std::make_shared<BootstrapConfig>(1, - _document_types, - _repo, - std::move(protonCfg), - std::make_shared<FiledistributorrpcConfig>(), - std::make_shared<BucketspacesConfig>(), - tuneFileDocDB, HwInfo()); - mgr.forwardConfig(bootstrap_config); - mgr.nextGeneration(0ms); - _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name, - _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, - _summary_executor, _summary_executor, *_persistence_engine, _tls, - _metrics_wire_service, _file_header_context, - _config_stores.getConfigStore(_doc_type_name.toString()), - std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo()); - _document_db->start(); - _document_db->waitForOnlineState(); -} - -DocumentId -PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const + : _document_types(make_document_types()), + _repo(document::DocumentTypeRepoFactory::make(*_document_types)), + _bm_cluster(std::make_unique<BmCluster>(params, _repo)), + _bm_node(BmNode::create(params, _document_types)), + _feed(_repo), + _feed_handler(nullptr) { - DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i)); - return id; } -std::unique_ptr<Document> -PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const -{ - auto id = make_document_id(n, i); - auto document = std::make_unique<Document>(*_document_type, id); - document->setRepo(*_repo); - document->setFieldValue(_field, std::make_unique<IntFieldValue>(i)); - return document; -} - -std::unique_ptr<DocumentUpdate> -PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const -{ - auto id = make_document_id(n, i); - auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id); - document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15)))); - return document_update; -} +PersistenceProviderFixture::~PersistenceProviderFixture() = default; void PersistenceProviderFixture::create_buckets() { - SpiBmFeedHandler feed_handler(*_persistence_engine, *_field_set_repo, false); - for (unsigned int i = 0; i < num_buckets(); ++i) { - feed_handler.create_bucket(make_bucket(i)); + auto feed_handler = _bm_node->make_create_bucket_feed_handler(false); + for (unsigned int i = 0; i < _feed.num_buckets(); ++i) { + feed_handler->create_bucket(_feed.make_bucket(i)); } } void PersistenceProviderFixture::wait_slobrok(const vespalib::string &name) { - auto &mirror = _rpc_client_shared_rpc_resources->slobrok_mirror(); - LOG(info, "Waiting for %s in slobrok", name.c_str()); - for (;;) { - auto specs = mirror.lookup(name); - if (!specs.empty()) { - LOG(info, "Found %s in slobrok", name.c_str()); - return; - } - std::this_thread::sleep_for(100ms); - } + _bm_cluster->wait_slobrok(name); } void -PersistenceProviderFixture::start_service_layer(const BMParams& params) +PersistenceProviderFixture::start_service_layer(const BmClusterParams& params) { - LOG(info, "start slobrok"); - _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); - LOG(info, "start service layer"); - config::ConfigUri config_uri("bm-servicelayer", _config_context); - std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain() && !params.needs_distributor()) { - chain_builder = std::make_unique<BmStorageChainBuilder>(); - _service_layer_chain_context = chain_builder->get_context(); - } - _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, - *_persistence_engine, - std::move(chain_builder)); - _service_layer->setupConfig(100ms); - _service_layer->createNode(); - _service_layer->getNode().waitUntilInitialized(); - LOG(info, "start rpc client shared resources"); - config::ConfigUri client_config_uri("bm-rpc-client", _config_context); - _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources> - (client_config_uri, _rpc_client_port, 100, params.get_rpc_events_before_wakup()); - _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); + _bm_cluster->start_slobrok(); + _bm_node->start_service_layer(params); + _bm_node->wait_service_layer(); + _bm_cluster->start_rpc_client(); wait_slobrok("storage/cluster.storage/storage/0/default"); wait_slobrok("storage/cluster.storage/storage/0"); - BmClusterController fake_controller(*_rpc_client_shared_rpc_resources); + BmClusterController fake_controller(_bm_cluster->get_rpc_client()); fake_controller.set_cluster_up(false); } void -PersistenceProviderFixture::start_distributor(const BMParams& params) +PersistenceProviderFixture::start_distributor(const BmClusterParams& params) { - config::ConfigUri config_uri("bm-distributor", _config_context); - std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain() && !params.get_use_document_api()) { - chain_builder = std::make_unique<BmStorageChainBuilder>(); - _distributor_chain_context = chain_builder->get_context(); - } - _distributor = std::make_unique<storage::DistributorProcess>(config_uri); - if (chain_builder) { - _distributor->set_storage_chain_builder(std::move(chain_builder)); - } - _distributor->setupConfig(100ms); - _distributor->createNode(); + _bm_node->start_distributor(params); wait_slobrok("storage/cluster.storage/distributor/0/default"); wait_slobrok("storage/cluster.storage/distributor/0"); - BmClusterController fake_controller(*_rpc_client_shared_rpc_resources); + BmClusterController fake_controller(_bm_cluster->get_rpc_client()); fake_controller.set_cluster_up(true); // Wait for bucket ownership transfer safe time std::this_thread::sleep_for(2s); @@ -924,102 +207,45 @@ PersistenceProviderFixture::start_distributor(const BMParams& params) void PersistenceProviderFixture::start_message_bus() { - config::ConfigUri config_uri("bm-message-bus", _config_context); - LOG(info, "Starting message bus"); - _message_bus = std::make_unique<BmMessageBus>(config_uri, _repo); - LOG(info, "Started message bus"); + _bm_cluster->start_message_bus(); } void -PersistenceProviderFixture::create_feed_handler(const BMParams& params) +PersistenceProviderFixture::create_feed_handler(const BmClusterParams& params) { - StorageApiRpcService::Params rpc_params; - // This is the same compression config as the default in stor-communicationmanager.def. - rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); - rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node(); - if (params.get_use_document_api()) { - _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus); - } else if (params.get_enable_distributor()) { - if (params.get_use_storage_chain()) { - assert(_distributor_chain_context); - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true); - } else if (params.get_use_message_bus()) { - _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true); - } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true); - } - } else if (params.needs_service_layer()) { - if (params.get_use_storage_chain()) { - assert(_service_layer_chain_context); - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false); - } else if (params.get_use_message_bus()) { - _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false); - } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false); - } - } + _bm_node->create_feed_handler(params, *_bm_cluster); + _feed_handler = _bm_node->get_feed_handler(); } void PersistenceProviderFixture::shutdown_feed_handler() { - _feed_handler.reset(); + _bm_node->shutdown_feed_handler(); + _feed_handler = nullptr; } void PersistenceProviderFixture::shutdown_message_bus() { - if (_message_bus) { - LOG(info, "stop message bus"); - _message_bus.reset(); - } + _bm_cluster->stop_message_bus(); } void PersistenceProviderFixture::shutdown_distributor() { - if (_distributor) { - LOG(info, "stop distributor"); - _distributor->getNode().requestShutdown("controlled shutdown"); - _distributor->shutdown(); - } + _bm_node->shutdown_distributor(); } void PersistenceProviderFixture::shutdown_service_layer() { - if (_rpc_client_shared_rpc_resources) { - LOG(info, "stop rpc client shared resources"); - _rpc_client_shared_rpc_resources->shutdown(); - _rpc_client_shared_rpc_resources.reset(); - } - if (_service_layer) { - LOG(info, "stop service layer"); - _service_layer->getNode().requestShutdown("controlled shutdown"); - _service_layer->shutdown(); - } - if (_slobrok) { - LOG(info, "stop slobrok"); - _slobrok.reset(); - } -} - -vespalib::nbostream -make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) -{ - vespalib::nbostream serialized_feed; - LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end()); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto n = bucket_selector(i); - serialized_feed << f.make_bucket_id(n); - auto document = f.make_document(n, i); - document->serialize(serialized_feed); - } - return serialized_feed; + _bm_cluster->stop_rpc_client(); + _bm_node->shutdown_service_layer(); + _bm_cluster->stop_slobrok(); } std::vector<vespalib::nbostream> -make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) +make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) { LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents()); std::vector<vespalib::nbostream> serialized_feed_v; @@ -1038,27 +264,6 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st return serialized_feed_v; } -void -put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - f._feed_handler->attach_bucket_info_queue(pending_tracker); - auto &repo = *f._repo; - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - auto bucket_space = f._bucket_space; - bool use_timestamp = !f._feed_handler->manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - auto document = std::make_unique<Document>(repo, is); - f._feed_handler->put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - class AvgSampler { private: double _total; @@ -1077,73 +282,42 @@ void run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { put_async_task(f, max_pending, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { feed.put_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput); time_bias += bm_params.get_documents(); } -vespalib::nbostream -make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) -{ - vespalib::nbostream serialized_feed; - LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto n = bucket_selector(i); - serialized_feed << f.make_bucket_id(n); - auto document_update = f.make_document_update(n, i); - document_update->serializeHEAD(serialized_feed); - } - return serialized_feed; -} - -void -update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - f._feed_handler->attach_bucket_info_queue(pending_tracker); - auto &repo = *f._repo; - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - auto bucket_space = f._bucket_space; - bool use_timestamp = !f._feed_handler->manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - auto document_update = DocumentUpdate::createHEAD(repo, is); - f._feed_handler->update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - void run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { update_async_task(f, max_pending, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { feed.update_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput); @@ -1151,94 +325,44 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu } void -get_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed) -{ - LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - vespalib::string all_fields(document::AllFields::NAME); - auto bucket_space = f._bucket_space; - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - DocumentId document_id(is); - f._feed_handler->get(bucket, all_fields, document_id, pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - -void run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() - { get_async_task(f, max_pending, range, serialized_feed); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]() + { feed.get_async_task(feed_handler, max_pending, range, serialized_feed); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput); } -vespalib::nbostream -make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) -{ - vespalib::nbostream serialized_feed; - LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - auto n = bucket_selector(i); - serialized_feed << f.make_bucket_id(n); - auto document_id = f.make_document_id(n, i); - vespalib::string raw_id = document_id.toString(); - serialized_feed.write(raw_id.c_str(), raw_id.size() + 1); - } - return serialized_feed; -} - -void -remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) -{ - LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); - feedbm::PendingTracker pending_tracker(max_pending); - f._feed_handler->attach_bucket_info_queue(pending_tracker); - vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - BucketId bucket_id; - auto bucket_space = f._bucket_space; - bool use_timestamp = !f._feed_handler->manages_timestamp(); - for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - is >> bucket_id; - document::Bucket bucket(bucket_space, bucket_id); - DocumentId document_id(is); - f._feed_handler->remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker); - } - assert(is.empty()); - pending_tracker.drain(); -} - void run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler) { - uint32_t old_errors = f._feed_handler->get_error_count(); + auto& feed = f._feed; + auto& feed_handler = *f._feed_handler; + uint32_t old_errors = feed_handler.get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() - { remove_async_task(f, max_pending, range, serialized_feed, time_bias); })); + executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]() + { feed.remove_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + uint32_t new_errors = feed_handler.get_error_count() - old_errors; double throughput = bm_params.get_documents() / elapsed.count(); sampler.sample(throughput); LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput); @@ -1310,10 +434,10 @@ void benchmark_async_spi(const BMParams &bm_params) { vespalib::rmdir(base_dir, true); PersistenceProviderFixture f(bm_params); - auto &provider = *f._persistence_engine; + auto &provider = *f.get_persistence_provider(); LOG(info, "start initialize"); provider.initialize(); - LOG(info, "create %u buckets", f.num_buckets()); + LOG(info, "create %u buckets", f._feed.num_buckets()); if (!bm_params.needs_distributor()) { f.create_buckets(); } @@ -1328,9 +452,10 @@ void benchmark_async_spi(const BMParams &bm_params) } f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki); - auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put"); - auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update"); - auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove"); + auto& feed = f._feed; + auto put_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put"); + auto update_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update"); + auto remove_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove"); int64_t time_bias = 1; LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str()); benchmark_async_put(f, executor, time_bias, put_feed, bm_params); diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt new file mode 100644 index 00000000000..d8e31011122 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -0,0 +1,42 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcore_bmcluster STATIC + SOURCES + bm_cluster.cpp + bm_cluster_controller.cpp + bm_cluster_params.cpp + bm_feed.cpp + bm_message_bus.cpp + bm_node.cpp + bm_storage_chain_builder.cpp + bm_storage_link.cpp + bucket_info_queue.cpp + document_api_message_bus_bm_feed_handler.cpp + pending_tracker.cpp + pending_tracker_hash.cpp + spi_bm_feed_handler.cpp + storage_api_chain_bm_feed_handler.cpp + storage_api_message_bus_bm_feed_handler.cpp + storage_api_rpc_bm_feed_handler.cpp + storage_reply_error_checker.cpp + DEPENDS + searchcore_server + searchcore_initializer + searchcore_reprocessing + searchcore_index + searchcore_persistenceengine + searchcore_docsummary + searchcore_feedoperation + searchcore_matching + searchcore_attribute + searchcore_documentmetastore + searchcore_bucketdb + searchcore_flushengine + searchcore_pcommon + searchcore_grouping + searchcore_proton_metrics + searchcore_fconfig + storageserver_storageapp + messagebus_messagebus-test + messagebus + searchlib_searchlib_uca +) diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp new file mode 100644 index 00000000000..5a0b05f5c54 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp @@ -0,0 +1,180 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_cluster.h" +#include "bm_message_bus.h" +#include <vespa/config/common/configcontext.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/messagebus/config-messagebus.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP(".bmcluster.bm_cluster"); + +using cloud::config::SlobroksConfigBuilder; +using config::ConfigSet; +using messagebus::MessagebusConfigBuilder; +using storage::rpc::SharedRpcResources; + +namespace search::bmcluster { + +namespace { + +vespalib::string message_bus_config_id("bm-message-bus"); +vespalib::string rpc_client_config_id("bm-rpc-client"); + +void +make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) +{ + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + slobroks.slobrok.push_back(std::move(slobrok)); +} + +} + +struct BmCluster::MessageBusConfigSet { + vespalib::string config_id; + SlobroksConfigBuilder slobroks; + MessagebusConfigBuilder messagebus; + + MessageBusConfigSet(const vespalib::string &config_id_in, int slobrok_port) + : config_id(config_id_in), + slobroks(), + messagebus() + { + make_slobroks_config(slobroks, slobrok_port); + } + ~MessageBusConfigSet(); + + void add_builders(ConfigSet &set) { + set.addBuilder(config_id, &slobroks); + set.addBuilder(config_id, &messagebus); + } +}; + +BmCluster::MessageBusConfigSet::~MessageBusConfigSet() = default; + +struct BmCluster::RpcClientConfigSet { + vespalib::string config_id; + SlobroksConfigBuilder slobroks; + + RpcClientConfigSet(const vespalib::string &config_id_in, int slobrok_port) + : config_id(config_id_in), + slobroks() + { + make_slobroks_config(slobroks, slobrok_port); + } + ~RpcClientConfigSet(); + + void add_builders(ConfigSet &set) { + set.addBuilder(config_id, &slobroks); + } +}; + +BmCluster::RpcClientConfigSet::~RpcClientConfigSet() = default; + +BmCluster::BmCluster(const BmClusterParams& params, std::shared_ptr<const document::DocumentTypeRepo> repo) + : _params(params), + _slobrok_port(9018), + _rpc_client_port(9019), + _message_bus_config(std::make_unique<MessageBusConfigSet>(message_bus_config_id, _slobrok_port)), + _rpc_client_config(std::make_unique<RpcClientConfigSet>(rpc_client_config_id, _slobrok_port)), + _config_set(std::make_unique<config::ConfigSet>()), + _config_context(std::make_shared<config::ConfigContext>(*_config_set)), + _slobrok(), + _message_bus(), + _rpc_client(), + _repo(repo) + +{ + _message_bus_config->add_builders(*_config_set); + _rpc_client_config->add_builders(*_config_set); +} + +BmCluster::~BmCluster() +{ + stop_message_bus(); + stop_rpc_client(); + stop_slobrok(); +} + + +void +BmCluster::start_slobrok() +{ + if (!_slobrok) { + LOG(info, "start slobrok"); + _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); + } +} + +void +BmCluster::stop_slobrok() +{ + if (_slobrok) { + LOG(info, "stop slobrok"); + _slobrok.reset(); + } +} + +void +BmCluster::wait_slobrok(const vespalib::string &name) +{ + auto &mirror = _rpc_client->slobrok_mirror(); + LOG(info, "Waiting for %s in slobrok", name.c_str()); + for (;;) { + auto specs = mirror.lookup(name); + if (!specs.empty()) { + LOG(info, "Found %s in slobrok", name.c_str()); + return; + } + std::this_thread::sleep_for(100ms); + } +} + +void +BmCluster::start_message_bus() +{ + if (!_message_bus) { + LOG(info, "Starting message bus"); + config::ConfigUri config_uri(message_bus_config_id, _config_context); + _message_bus = std::make_unique<BmMessageBus>(config_uri, _repo); + LOG(info, "Started message bus"); + } +} + +void +BmCluster::stop_message_bus() +{ + if (_message_bus) { + LOG(info, "stop message bus"); + _message_bus.reset(); + } +} + +void +BmCluster::start_rpc_client() +{ + if (!_rpc_client) { + LOG(info, "start rpc client"); + config::ConfigUri client_config_uri(rpc_client_config_id, _config_context); + _rpc_client = std::make_unique<SharedRpcResources> + (client_config_uri, _rpc_client_port, 100, _params.get_rpc_events_before_wakeup()); + _rpc_client->start_server_and_register_slobrok(rpc_client_config_id); + } +} + +void +BmCluster::stop_rpc_client() +{ + if (_rpc_client) { + LOG(info, "stop rpc client"); + _rpc_client->shutdown(); + _rpc_client.reset(); + } +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h new file mode 100644 index 00000000000..9a41ec5b415 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h @@ -0,0 +1,55 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> +#include "bm_cluster_params.h" + +namespace config { + +class IConfigContext; +class ConfigSet; + +} + +namespace document { class DocumentTypeRepo; } +namespace mbus { class Slobrok; } +namespace storage::rpc { class SharedRpcResources; } + +namespace search::bmcluster { + +class BmMessageBus; + +/* + * Class representing a benchmark cluster with one or more benchmark nodes. + */ +class BmCluster { + struct MessageBusConfigSet; + struct RpcClientConfigSet; + BmClusterParams _params; + int _slobrok_port; + int _rpc_client_port; + std::unique_ptr<MessageBusConfigSet> _message_bus_config; + std::unique_ptr<RpcClientConfigSet> _rpc_client_config; + std::unique_ptr<config::ConfigSet> _config_set; + std::shared_ptr<config::IConfigContext> _config_context; + std::unique_ptr<mbus::Slobrok> _slobrok; + std::unique_ptr<BmMessageBus> _message_bus; + std::unique_ptr<storage::rpc::SharedRpcResources> _rpc_client; + std::shared_ptr<const document::DocumentTypeRepo> _repo; + +public: + BmCluster(const BmClusterParams& params, std::shared_ptr<const document::DocumentTypeRepo> repo); + ~BmCluster(); + void start_slobrok(); + void stop_slobrok(); + void wait_slobrok(const vespalib::string &name); + void start_message_bus(); + void stop_message_bus(); + void start_rpc_client(); + void stop_rpc_client(); + storage::rpc::SharedRpcResources &get_rpc_client() { return *_rpc_client; } + BmMessageBus& get_message_bus() { return *_message_bus; } +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp index a1b40c56e11..8d8f89a6fe7 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp @@ -13,7 +13,7 @@ using storage::api::StorageMessageAddress; using storage::rpc::SharedRpcResources; using storage::lib::NodeType; -namespace feedbm { +namespace search::bmcluster { namespace { diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h index 699036be5c9..d4e26e2f4fd 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h @@ -5,7 +5,7 @@ namespace storage::api { class StorageMessageAddress; } namespace storage::rpc { class SharedRpcResources; } -namespace feedbm { +namespace search::bmcluster { /* * Fake cluster controller that sets cluster state to be up. diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp new file mode 100644 index 00000000000..3452ffa8b25 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -0,0 +1,47 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_cluster_params.h" +#include <iostream> + +namespace search::bmcluster { + +BmClusterParams::BmClusterParams() + : _bucket_db_stripe_bits(0), + _distributor_stripes(0), + _enable_distributor(false), + _enable_service_layer(false), + _indexing_sequencer(), + _response_threads(2), // Same default as in stor-filestor.def + _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def + _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def + _rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def + _skip_communicationmanager_thread(false), // Same default as in stor-communicationmanager.def + _skip_get_spi_bucket_info(false), + _use_async_message_handling_on_schedule(false), + _use_document_api(false), + _use_message_bus(false), + _use_storage_chain(false) +{ +} + +BmClusterParams::~BmClusterParams() = default; + +bool +BmClusterParams::check() const +{ + if (_response_threads < 1) { + std::cerr << "Too few response threads: " << _response_threads << std::endl; + return false; + } + if (_rpc_network_threads < 1) { + std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; + return false; + } + if (_rpc_targets_per_node < 1) { + std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl; + return false; + } + return true; +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h new file mode 100644 index 00000000000..dcf579452c6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -0,0 +1,67 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> +#include <vespa/vespalib/stllike/string.h> + +namespace search::bmcluster { + +/* + * Parameters for setting up a benchmark cluster. + */ +class BmClusterParams +{ + uint32_t _bucket_db_stripe_bits; + uint32_t _distributor_stripes; + bool _enable_distributor; + bool _enable_service_layer; + vespalib::string _indexing_sequencer; + uint32_t _response_threads; + uint32_t _rpc_events_before_wakeup; + uint32_t _rpc_network_threads; + uint32_t _rpc_targets_per_node; + bool _skip_communicationmanager_thread; + bool _skip_get_spi_bucket_info; + bool _use_async_message_handling_on_schedule; + bool _use_document_api; + bool _use_message_bus; + bool _use_storage_chain; +public: + BmClusterParams(); + ~BmClusterParams(); + uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } + uint32_t get_distributor_stripes() const { return _distributor_stripes; } + bool get_enable_distributor() const { return _enable_distributor; } + const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; } + uint32_t get_response_threads() const { return _response_threads; } + uint32_t get_rpc_events_before_wakeup() const { return _rpc_events_before_wakeup; } + uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } + uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; } + bool get_skip_communicationmanager_thread() const { return _skip_communicationmanager_thread; } + bool get_skip_get_spi_bucket_info() const { return _skip_get_spi_bucket_info; } + bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; } + bool get_use_document_api() const { return _use_document_api; } + bool get_use_message_bus() const { return _use_message_bus; } + bool get_use_storage_chain() const { return _use_storage_chain; } + bool needs_distributor() const { return _enable_distributor || _use_document_api; } + bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } + void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } + void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; } + void set_enable_distributor(bool value) { _enable_distributor = value; } + void set_enable_service_layer(bool value) { _enable_service_layer = value; } + void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; } + void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } + void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; } + void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } + void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; } + void set_skip_communicationmanager_thread(bool value) { _skip_communicationmanager_thread = value; } + void set_skip_get_spi_bucket_info(bool value) { _skip_get_spi_bucket_info = value; } + void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; } + void set_use_document_api(bool value) { _use_document_api = value; } + void set_use_message_bus(bool value) { _use_message_bus = value; } + void set_use_storage_chain(bool value) { _use_storage_chain = value; } + bool check() const; +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp new file mode 100644 index 00000000000..91f3f916aad --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp @@ -0,0 +1,195 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_feed.h" +#include "bm_range.h" +#include "bucket_selector.h" +#include "pending_tracker.h" +#include "i_bm_feed_handler.h" +#include <vespa/document/base/documentid.h> +#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/datatype/documenttype.h> +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/update/assignvalueupdate.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".bmcluster.bm_feed"); + +using document::AssignValueUpdate; +using document::Document; +using document::DocumentId; +using document::DocumentType; +using document::DocumentTypeRepo; +using document::DocumentUpdate; +using document::IntFieldValue; +using document::FieldUpdate; + +namespace search::bmcluster { + +BmFeed::BmFeed(std::shared_ptr<const DocumentTypeRepo> repo) + : _repo(std::move(repo)), + _document_type(_repo->getDocumentType("test")), + _field(_document_type->getField("int")), + _bucket_bits(16), + _bucket_space(document::test::makeBucketSpace("test")) +{ +} + +BmFeed::~BmFeed() +{ +} + +DocumentId +BmFeed::make_document_id(uint32_t n, uint32_t i) const +{ + DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i)); + return id; +} + +std::unique_ptr<Document> +BmFeed::make_document(uint32_t n, uint32_t i) const +{ + auto id = make_document_id(n, i); + auto document = std::make_unique<Document>(*_document_type, id); + document->setRepo(*_repo); + document->setFieldValue(_field, std::make_unique<IntFieldValue>(i)); + return document; +} + +std::unique_ptr<DocumentUpdate> +BmFeed::make_document_update(uint32_t n, uint32_t i) const +{ + auto id = make_document_id(n, i); + auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id); + document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15)))); + return document_update; +} + +vespalib::nbostream +BmFeed::make_put_feed(BmRange range, BucketSelector bucket_selector) +{ + vespalib::nbostream serialized_feed; + LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end()); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + auto n = bucket_selector(i); + serialized_feed << make_bucket_id(n); + auto document = make_document(n, i); + document->serialize(serialized_feed); + } + return serialized_feed; +} + +vespalib::nbostream +BmFeed::make_update_feed(BmRange range, BucketSelector bucket_selector) +{ + vespalib::nbostream serialized_feed; + LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + auto n = bucket_selector(i); + serialized_feed << make_bucket_id(n); + auto document_update = make_document_update(n, i); + document_update->serializeHEAD(serialized_feed); + } + return serialized_feed; +} + +vespalib::nbostream +BmFeed::make_remove_feed(BmRange range, BucketSelector bucket_selector) +{ + vespalib::nbostream serialized_feed; + LOG(debug, "make_remove_feed([%u..%u))", range.get_start(), range.get_end()); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + auto n = bucket_selector(i); + serialized_feed << make_bucket_id(n); + auto document_id = make_document_id(n, i); + vespalib::string raw_id = document_id.toString(); + serialized_feed.write(raw_id.c_str(), raw_id.size() + 1); + } + return serialized_feed; +} + + +void +BmFeed::put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +{ + LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end()); + PendingTracker pending_tracker(max_pending); + feed_handler.attach_bucket_info_queue(pending_tracker); + auto &repo = *_repo; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + bool use_timestamp = !feed_handler.manages_timestamp(); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + auto document = std::make_unique<Document>(repo, is); + feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeed::update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +{ + LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end()); + PendingTracker pending_tracker(max_pending); + feed_handler.attach_bucket_info_queue(pending_tracker); + auto &repo = *_repo; + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + bool use_timestamp = !feed_handler.manages_timestamp(); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + auto document_update = DocumentUpdate::createHEAD(repo, is); + feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeed::get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed) +{ + LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end()); + search::bmcluster::PendingTracker pending_tracker(max_pending); + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + vespalib::string all_fields(document::AllFields::NAME); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + DocumentId document_id(is); + feed_handler.get(bucket, all_fields, document_id, pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +void +BmFeed::remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias) +{ + LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end()); + search::bmcluster::PendingTracker pending_tracker(max_pending); + feed_handler.attach_bucket_info_queue(pending_tracker); + vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); + document::BucketId bucket_id; + bool use_timestamp = !feed_handler.manages_timestamp(); + for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { + is >> bucket_id; + document::Bucket bucket(_bucket_space, bucket_id); + DocumentId document_id(is); + feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker); + } + assert(is.empty()); + pending_tracker.drain(); +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h new file mode 100644 index 00000000000..cab22651251 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h @@ -0,0 +1,57 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/base/documentid.h> +#include <vespa/document/bucket/bucketspace.h> +#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> + +namespace document { + +class Document; +class DocumentType; +class DocumentTypeRepo; +class DocumentUpdate; +class Field; + +} + +namespace vespalib { class nbostream; } + +namespace search::bmcluster { + +class BmRange; +class BucketSelector; +class IBmFeedHandler; + +/* + * Class to generate synthetic feed of documents. + */ +class BmFeed { + std::shared_ptr<const document::DocumentTypeRepo> _repo; + const document::DocumentType* _document_type; + const document::Field& _field; + uint32_t _bucket_bits; + document::BucketSpace _bucket_space; +public: + + BmFeed(std::shared_ptr<const document::DocumentTypeRepo> document_types); + ~BmFeed(); + uint32_t num_buckets() const { return (1u << _bucket_bits); } + document::BucketSpace get_bucket_space() const noexcept { return _bucket_space; } + document::BucketId make_bucket_id(uint32_t n) const { return document::BucketId(_bucket_bits, n & (num_buckets() - 1)); } + document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); } + document::DocumentId make_document_id(uint32_t n, uint32_t i) const; + std::unique_ptr<document::Document> make_document(uint32_t n, uint32_t i) const; + std::unique_ptr<document::DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const; + vespalib::nbostream make_put_feed(BmRange range, BucketSelector bucket_selector); + vespalib::nbostream make_update_feed(BmRange range, BucketSelector bucket_selector); + vespalib::nbostream make_remove_feed(BmRange range, BucketSelector bucket_selector); + void put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + void update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); + void get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed); + void remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp index b608593dada..fc6acbcab12 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp @@ -24,7 +24,7 @@ using mbus::SourceSession; using storage::mbusprot::StorageProtocol; using storage::mbusprot::StorageReply; -namespace feedbm { +namespace search::bmcluster { namespace { diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.h index a9cff1fb826..7829a4e4946 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.h @@ -16,12 +16,12 @@ class SourceSession; } -namespace feedbm { +namespace search::bmcluster { class PendingTracker; /* - * Message bus for feed benchmark program. + * Message bus for benchmark cluster. */ class BmMessageBus { diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp new file mode 100644 index 00000000000..808ca348622 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -0,0 +1,675 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_node.h" +#include "bm_cluster.h" +#include "bm_cluster_params.h" +#include "bm_message_bus.h" +#include "bm_storage_chain_builder.h" +#include "bm_storage_link_context.h" +#include "storage_api_chain_bm_feed_handler.h" +#include "storage_api_message_bus_bm_feed_handler.h" +#include "storage_api_rpc_bm_feed_handler.h" +#include "document_api_message_bus_bm_feed_handler.h" +#include "i_bm_feed_handler.h" +#include "spi_bm_feed_handler.h" +#include <vespa/config-attributes.h> +#include <vespa/config-bucketspaces.h> +#include <vespa/config-imported-fields.h> +#include <vespa/config-indexschema.h> +#include <vespa/config-persistence.h> +#include <vespa/config-rank-profiles.h> +#include <vespa/config-slobroks.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config-stor-filestor.h> +#include <vespa/config-summary.h> +#include <vespa/config-summarymap.h> +#include <vespa/config-upgrading.h> +#include <vespa/config/common/configcontext.h> +#include <vespa/document/bucket/bucketspace.h> +#include <vespa/document/fieldset/fieldsetrepo.h> +#include <vespa/document/repo/configbuilder.h> +#include <vespa/document/repo/document_type_repo_factory.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/messagebus/config-messagebus.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/metrics/config-metricsmanager.h> +#include <vespa/searchcommon/common/schemaconfigurer.h> +#include <vespa/searchcore/proton/common/alloc_config.h> +#include <vespa/searchcore/proton/matching/querylimiter.h> +#include <vespa/searchcore/proton/metrics/metricswireservice.h> +#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> +#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> +#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> +#include <vespa/searchcore/proton/server/bootstrapconfig.h> +#include <vespa/searchcore/proton/server/documentdb.h> +#include <vespa/searchcore/proton/server/document_db_maintenance_config.h> +#include <vespa/searchcore/proton/server/documentdbconfigmanager.h> +#include <vespa/searchcore/proton/server/fileconfigmanager.h> +#include <vespa/searchcore/proton/server/memoryconfigstore.h> +#include <vespa/searchcore/proton/server/persistencehandlerproxy.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> +#include <vespa/searchlib/index/dummyfileheadercontext.h> +#include <vespa/searchlib/transactionlog/translogserver.h> +#include <vespa/searchsummary/config/config-juniperrc.h> +#include <vespa/storage/bucketdb/config-stor-bucket-init.h> +#include <vespa/storage/common/i_storage_chain_builder.h> +#include <vespa/storage/config/config-stor-bouncer.h> +#include <vespa/storage/config/config-stor-communicationmanager.h> +#include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/storage/config/config-stor-opslogger.h> +#include <vespa/storage/config/config-stor-prioritymapping.h> +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/config/config-stor-status.h> +#include <vespa/storage/config/config-stor-visitordispatcher.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/storageserver/app/distributorprocess.h> +#include <vespa/storageserver/app/servicelayerprocess.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/size_literals.h> +#include <tests/proton/common/dummydbowner.h> + +#include <vespa/log/log.h> +LOG_SETUP(".bmcluster.bm_node"); + +using cloud::config::SlobroksConfigBuilder; +using cloud::config::filedistribution::FiledistributorrpcConfig; +using config::ConfigSet; +using document::BucketSpace; +using document::DocumenttypesConfig; +using document::DocumenttypesConfigBuilder; +using document::DocumentType; +using document::DocumentTypeRepo; +using document::Field; +using messagebus::MessagebusConfigBuilder; +using metrics::MetricsmanagerConfigBuilder; +using proton::BootstrapConfig; +using proton::DocTypeName; +using proton::DocumentDB; +using proton::DocumentDBConfig; +using proton::HwInfo; +using search::index::Schema; +using search::index::SchemaBuilder; +using search::transactionlog::TransLogServer; +using storage::rpc::SharedRpcResources; +using storage::rpc::StorageApiRpcService; +using storage::spi::PersistenceProvider; +using vespa::config::content::PersistenceConfigBuilder; +using vespa::config::content::StorDistributionConfigBuilder; +using vespa::config::content::StorFilestorConfigBuilder; +using vespa::config::content::UpgradingConfigBuilder; +using vespa::config::content::core::BucketspacesConfig; +using vespa::config::content::core::BucketspacesConfigBuilder; +using vespa::config::content::core::StorBouncerConfigBuilder; +using vespa::config::content::core::StorBucketInitConfigBuilder; +using vespa::config::content::core::StorCommunicationmanagerConfigBuilder; +using vespa::config::content::core::StorDistributormanagerConfigBuilder; +using vespa::config::content::core::StorOpsloggerConfigBuilder; +using vespa::config::content::core::StorPrioritymappingConfigBuilder; +using vespa::config::content::core::StorServerConfigBuilder; +using vespa::config::content::core::StorStatusConfigBuilder; +using vespa::config::content::core::StorVisitorConfigBuilder; +using vespa::config::content::core::StorVisitordispatcherConfigBuilder; +using vespa::config::search::AttributesConfig; +using vespa::config::search::AttributesConfigBuilder; +using vespa::config::search::ImportedFieldsConfig; +using vespa::config::search::IndexschemaConfig; +using vespa::config::search::RankProfilesConfig; +using vespa::config::search::SummaryConfig; +using vespa::config::search::SummarymapConfig; +using vespa::config::search::core::ProtonConfig; +using vespa::config::search::core::ProtonConfigBuilder; +using vespa::config::search::summary::JuniperrcConfig; +using vespalib::compression::CompressionConfig; + +namespace search::bmcluster { + +vespalib::string base_dir = "testdb"; + +std::shared_ptr<AttributesConfig> make_attributes_config() { + AttributesConfigBuilder builder; + AttributesConfig::Attribute attribute; + attribute.name = "int"; + attribute.datatype = AttributesConfig::Attribute::Datatype::INT32; + builder.attribute.emplace_back(attribute); + return std::make_shared<AttributesConfig>(builder); +} + +std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const DocumentTypeRepo> repo, const DocTypeName& doc_type_name) +{ + auto indexschema = std::make_shared<IndexschemaConfig>(); + auto attributes = make_attributes_config(); + auto summary = std::make_shared<SummaryConfig>(); + std::shared_ptr<Schema> schema(new Schema()); + SchemaBuilder::build(*indexschema, *schema); + SchemaBuilder::build(*attributes, *schema); + SchemaBuilder::build(*summary, *schema); + return std::make_shared<DocumentDBConfig>( + 1, + std::make_shared<RankProfilesConfig>(), + std::make_shared<proton::matching::RankingConstants>(), + std::make_shared<proton::matching::RankingExpressions>(), + std::make_shared<proton::matching::OnnxModels>(), + indexschema, + attributes, + summary, + std::make_shared<SummarymapConfig>(), + std::make_shared<JuniperrcConfig>(), + document_types, + repo, + std::make_shared<ImportedFieldsConfig>(), + std::make_shared<TuneFileDocumentDB>(), + schema, + std::make_shared<proton::DocumentDBMaintenanceConfig>(), + search::LogDocumentStore::Config(), + std::make_shared<const proton::ThreadingServiceConfig>(proton::ThreadingServiceConfig::make(1)), + std::make_shared<const proton::AllocConfig>(), + "client", + doc_type_name.getName()); +} + +void +make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) +{ + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + slobroks.slobrok.push_back(std::move(slobrok)); +} + +void +make_bucketspaces_config(BucketspacesConfigBuilder& bucketspaces) +{ + BucketspacesConfigBuilder::Documenttype bucket_space_map; + bucket_space_map.name = "test"; + bucket_space_map.bucketspace = "default"; + bucketspaces.documenttype.emplace_back(std::move(bucket_space_map)); +} + +class MyPersistenceEngineOwner : public proton::IPersistenceEngineOwner +{ + void setClusterState(BucketSpace, const storage::spi::ClusterState&) override { } +}; + +struct MyResourceWriteFilter : public proton::IResourceWriteFilter +{ + bool acceptWriteOperation() const override { return true; } + State getAcceptState() const override { return IResourceWriteFilter::State(); } +}; + +class MyServiceLayerProcess : public storage::ServiceLayerProcess { + PersistenceProvider& _provider; + +public: + MyServiceLayerProcess(const config::ConfigUri& configUri, + PersistenceProvider& provider, + std::unique_ptr<storage::IStorageChainBuilder> chain_builder); + ~MyServiceLayerProcess() override { shutdown(); } + + void shutdown() override; + void setupProvider() override; + PersistenceProvider& getProvider() override; +}; + +MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri& configUri, + PersistenceProvider& provider, + std::unique_ptr<storage::IStorageChainBuilder> chain_builder) + : ServiceLayerProcess(configUri), + _provider(provider) +{ + if (chain_builder) { + set_storage_chain_builder(std::move(chain_builder)); + } +} + +void +MyServiceLayerProcess::shutdown() +{ + ServiceLayerProcess::shutdown(); +} + +void +MyServiceLayerProcess::setupProvider() +{ +} + +PersistenceProvider& +MyServiceLayerProcess::getProvider() +{ + return _provider; +} + +struct StorageConfigSet +{ + vespalib::string config_id; + DocumenttypesConfigBuilder documenttypes; + StorDistributionConfigBuilder stor_distribution; + StorBouncerConfigBuilder stor_bouncer; + StorCommunicationmanagerConfigBuilder stor_communicationmanager; + StorOpsloggerConfigBuilder stor_opslogger; + StorPrioritymappingConfigBuilder stor_prioritymapping; + UpgradingConfigBuilder upgrading; + StorServerConfigBuilder stor_server; + StorStatusConfigBuilder stor_status; + BucketspacesConfigBuilder bucketspaces; + MetricsmanagerConfigBuilder metricsmanager; + SlobroksConfigBuilder slobroks; + MessagebusConfigBuilder messagebus; + + StorageConfigSet(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, + int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params) + : config_id(config_id_in), + documenttypes(documenttypes_in), + stor_distribution(), + stor_bouncer(), + stor_communicationmanager(), + stor_opslogger(), + stor_prioritymapping(), + upgrading(), + stor_server(), + stor_status(), + bucketspaces(), + metricsmanager(), + slobroks(), + messagebus() + { + { + auto& dc = stor_distribution; + { + StorDistributionConfigBuilder::Group group; + { + StorDistributionConfigBuilder::Group::Nodes node; + node.index = 0; + group.nodes.push_back(std::move(node)); + } + group.index = "invalid"; + group.name = "invalid"; + group.capacity = 1.0; + group.partitions = ""; + dc.group.push_back(std::move(group)); + } + dc.redundancy = 1; + dc.readyCopies = 1; + } + stor_server.isDistributor = distributor; + stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits(); + if (distributor) { + stor_server.rootFolder = "distributor"; + } else { + stor_server.rootFolder = "storage"; + } + make_slobroks_config(slobroks, slobrok_port); + stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads(); + stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakeup(); + stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node(); + stor_communicationmanager.mbusport = mbus_port; + stor_communicationmanager.rpcport = rpc_port; + stor_communicationmanager.skipThread = params.get_skip_communicationmanager_thread(); + + stor_status.httpport = status_port; + make_bucketspaces_config(bucketspaces); + } + + ~StorageConfigSet(); + + void add_builders(ConfigSet& set) { + set.addBuilder(config_id, &documenttypes); + set.addBuilder(config_id, &stor_distribution); + set.addBuilder(config_id, &stor_bouncer); + set.addBuilder(config_id, &stor_communicationmanager); + set.addBuilder(config_id, &stor_opslogger); + set.addBuilder(config_id, &stor_prioritymapping); + set.addBuilder(config_id, &upgrading); + set.addBuilder(config_id, &stor_server); + set.addBuilder(config_id, &stor_status); + set.addBuilder(config_id, &bucketspaces); + set.addBuilder(config_id, &metricsmanager); + set.addBuilder(config_id, &slobroks); + set.addBuilder(config_id, &messagebus); + } +}; + +StorageConfigSet::~StorageConfigSet() = default; + +struct ServiceLayerConfigSet : public StorageConfigSet +{ + PersistenceConfigBuilder persistence; + StorFilestorConfigBuilder stor_filestor; + StorBucketInitConfigBuilder stor_bucket_init; + StorVisitorConfigBuilder stor_visitor; + + ServiceLayerConfigSet(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, + int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params) + : StorageConfigSet(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), + persistence(), + stor_filestor(), + stor_bucket_init(), + stor_visitor() + { + stor_filestor.numResponseThreads = params.get_response_threads(); + stor_filestor.numNetworkThreads = params.get_rpc_network_threads(); + stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule(); + } + + ~ServiceLayerConfigSet(); + + void add_builders(ConfigSet& set) { + StorageConfigSet::add_builders(set); + set.addBuilder(config_id, &persistence); + set.addBuilder(config_id, &stor_filestor); + set.addBuilder(config_id, &stor_bucket_init); + set.addBuilder(config_id, &stor_visitor); + } +}; + +ServiceLayerConfigSet::~ServiceLayerConfigSet() = default; + +struct DistributorConfigSet : public StorageConfigSet +{ + StorDistributormanagerConfigBuilder stor_distributormanager; + StorVisitordispatcherConfigBuilder stor_visitordispatcher; + + DistributorConfigSet(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, + int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params) + : StorageConfigSet(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), + stor_distributormanager(), + stor_visitordispatcher() + { + stor_distributormanager.numDistributorStripes = params.get_distributor_stripes(); + } + + ~DistributorConfigSet(); + + void add_builders(ConfigSet& set) { + StorageConfigSet::add_builders(set); + set.addBuilder(config_id, &stor_distributormanager); + set.addBuilder(config_id, &stor_visitordispatcher); + } +}; + +DistributorConfigSet::~DistributorConfigSet() = default; + +BmNode::BmNode(std::shared_ptr<document::DocumenttypesConfig> document_types) + : _document_types(std::move(document_types)), + _repo(document::DocumentTypeRepoFactory::make(*_document_types)), + _doc_type_name("test"), + _document_type(_repo->getDocumentType(_doc_type_name.getName())), + _field(_document_type->getField("int")) +{ +} + +BmNode::~BmNode() = default; + +class MyBmNode : public BmNode +{ + std::shared_ptr<DocumentDBConfig> _document_db_config; + vespalib::string _base_dir; + search::index::DummyFileHeaderContext _file_header_context; + int _tls_listen_port; + int _slobrok_port; + int _service_layer_mbus_port; + int _service_layer_rpc_port; + int _service_layer_status_port; + int _distributor_mbus_port; + int _distributor_rpc_port; + int _distributor_status_port; + TransLogServer _tls; + vespalib::string _tls_spec; + proton::matching::QueryLimiter _query_limiter; + vespalib::Clock _clock; + proton::DummyWireService _metrics_wire_service; + proton::MemoryConfigStores _config_stores; + vespalib::ThreadStackExecutor _summary_executor; + proton::DummyDBOwner _document_db_owner; + BucketSpace _bucket_space; + std::shared_ptr<DocumentDB> _document_db; + MyPersistenceEngineOwner _persistence_owner; + MyResourceWriteFilter _write_filter; + proton::test::DiskMemUsageNotifier _disk_mem_usage_notifier; + std::shared_ptr<proton::PersistenceEngine> _persistence_engine; + std::unique_ptr<const document::FieldSetRepo> _field_set_repo; + ServiceLayerConfigSet _service_layer_config; + DistributorConfigSet _distributor_config; + ConfigSet _config_set; + std::shared_ptr<config::IConfigContext> _config_context; + std::unique_ptr<IBmFeedHandler> _feed_handler; + std::unique_ptr<mbus::Slobrok> _slobrok; + std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context; + std::unique_ptr<MyServiceLayerProcess> _service_layer; + std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; + std::unique_ptr<storage::DistributorProcess> _distributor; + + void create_document_db(const BmClusterParams& params); +public: + MyBmNode(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types); + ~MyBmNode() override; + std::unique_ptr<SpiBmFeedHandler> make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) override; + void start_service_layer(const BmClusterParams& params) override; + void wait_service_layer() override; + void start_distributor(const BmClusterParams& params) override; + void create_feed_handler(const BmClusterParams& params, BmCluster& cluster) override; + void shutdown_feed_handler() override; + void shutdown_distributor() override; + void shutdown_service_layer() override; + IBmFeedHandler* get_feed_handler() override; + PersistenceProvider* get_persistence_provider() override; +}; + +MyBmNode::MyBmNode(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types) + : BmNode(std::move(document_types)), + _document_db_config(make_document_db_config(_document_types, _repo, _doc_type_name)), + _base_dir(base_dir), + _file_header_context(), + _tls_listen_port(9017), + _slobrok_port(9018), + _service_layer_mbus_port(9020), + _service_layer_rpc_port(9021), + _service_layer_status_port(9022), + _distributor_mbus_port(9023), + _distributor_rpc_port(9024), + _distributor_status_port(9025), + _tls("tls", _tls_listen_port, _base_dir, _file_header_context), + _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), + _query_limiter(), + _clock(), + _metrics_wire_service(), + _config_stores(), + _summary_executor(8, 128_Ki), + _document_db_owner(), + _bucket_space(document::test::makeBucketSpace(_doc_type_name.getName())), + _document_db(), + _persistence_owner(), + _write_filter(), + _disk_mem_usage_notifier(), + _persistence_engine(), + _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)), + _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), + _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), + _config_set(), + _config_context(std::make_shared<config::ConfigContext>(_config_set)), + _feed_handler(), + _slobrok(), + _service_layer_chain_context(), + _service_layer(), + _distributor_chain_context(), + _distributor() +{ + create_document_db(params); + _persistence_engine = std::make_unique<proton::PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false); + auto proxy = std::make_shared<proton::PersistenceHandlerProxy>(_document_db); + _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); + _service_layer_config.add_builders(_config_set); + _distributor_config.add_builders(_config_set); + _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info()); +} + +MyBmNode::~MyBmNode() +{ + if (_persistence_engine) { + _persistence_engine->destroyIterators(); + _persistence_engine->removeHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name); + } + if (_document_db) { + _document_db->close(); + } +} + + +void +MyBmNode::create_document_db(const BmClusterParams& params) +{ + vespalib::mkdir(_base_dir, false); + vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false); + vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig"; + { + proton::FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName()); + fileCfg.saveConfig(*_document_db_config, 1); + } + config::DirSpec spec(input_cfg + "/config-1"); + auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>(); + proton::DocumentDBConfigHelper mgr(spec, _doc_type_name.getName()); + auto protonCfg = std::make_shared<ProtonConfigBuilder>(); + if ( ! params.get_indexing_sequencer().empty()) { + vespalib::string sequencer = params.get_indexing_sequencer(); + std::transform(sequencer.begin(), sequencer.end(), sequencer.begin(), [](unsigned char c){ return std::toupper(c); }); + protonCfg->indexing.optimize = ProtonConfig::Indexing::getOptimize(sequencer); + } + auto bootstrap_config = std::make_shared<BootstrapConfig>(1, + _document_types, + _repo, + std::move(protonCfg), + std::make_shared<FiledistributorrpcConfig>(), + std::make_shared<BucketspacesConfig>(), + tuneFileDocDB, HwInfo()); + mgr.forwardConfig(bootstrap_config); + mgr.nextGeneration(0ms); + _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name, + _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, + _summary_executor, _summary_executor, *_persistence_engine, _tls, + _metrics_wire_service, _file_header_context, + _config_stores.getConfigStore(_doc_type_name.toString()), + std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo()); + _document_db->start(); + _document_db->waitForOnlineState(); +} + +std::unique_ptr<SpiBmFeedHandler> +MyBmNode::make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) +{ + return std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, skip_get_spi_bucket_info); +} + +void +MyBmNode::start_service_layer(const BmClusterParams& params) +{ + config::ConfigUri config_uri("bm-servicelayer", _config_context); + std::unique_ptr<BmStorageChainBuilder> chain_builder; + if (params.get_use_storage_chain() && !params.needs_distributor()) { + chain_builder = std::make_unique<BmStorageChainBuilder>(); + _service_layer_chain_context = chain_builder->get_context(); + } + _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, + *_persistence_engine, + std::move(chain_builder)); + _service_layer->setupConfig(100ms); + _service_layer->createNode(); +} + +void +MyBmNode::wait_service_layer() +{ + _service_layer->getNode().waitUntilInitialized(); +} + +void +MyBmNode::start_distributor(const BmClusterParams& params) +{ + config::ConfigUri config_uri("bm-distributor", _config_context); + std::unique_ptr<BmStorageChainBuilder> chain_builder; + if (params.get_use_storage_chain() && !params.get_use_document_api()) { + chain_builder = std::make_unique<BmStorageChainBuilder>(); + _distributor_chain_context = chain_builder->get_context(); + } + _distributor = std::make_unique<storage::DistributorProcess>(config_uri); + if (chain_builder) { + _distributor->set_storage_chain_builder(std::move(chain_builder)); + } + _distributor->setupConfig(100ms); + _distributor->createNode(); +} + +void +MyBmNode::create_feed_handler(const BmClusterParams& params, BmCluster& cluster) +{ + StorageApiRpcService::Params rpc_params; + // This is the same compression config as the default in stor-communicationmanager.def. + rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); + rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node(); + if (params.get_use_document_api()) { + _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(cluster.get_message_bus()); + } else if (params.get_enable_distributor()) { + if (params.get_use_storage_chain()) { + assert(_distributor_chain_context); + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true); + } else if (params.get_use_message_bus()) { + _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(cluster.get_message_bus(), true); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(cluster.get_rpc_client(), _repo, rpc_params, true); + } + } else if (params.needs_service_layer()) { + if (params.get_use_storage_chain()) { + assert(_service_layer_chain_context); + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false); + } else if (params.get_use_message_bus()) { + _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(cluster.get_message_bus(), false); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(cluster.get_rpc_client(), _repo, rpc_params, false); + } + } +} + +void +MyBmNode::shutdown_feed_handler() +{ + _feed_handler.reset(); +} + +void +MyBmNode::shutdown_distributor() +{ + if (_distributor) { + LOG(info, "stop distributor"); + _distributor->getNode().requestShutdown("controlled shutdown"); + _distributor->shutdown(); + } +} + +void +MyBmNode::shutdown_service_layer() +{ + if (_service_layer) { + LOG(info, "stop service layer"); + _service_layer->getNode().requestShutdown("controlled shutdown"); + _service_layer->shutdown(); + } +} + +IBmFeedHandler* +MyBmNode::get_feed_handler() +{ + return _feed_handler.get(); +} + +PersistenceProvider* +MyBmNode::get_persistence_provider() +{ + return _persistence_engine.get(); +} + +std::unique_ptr<BmNode> +BmNode::create(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types) +{ + return std::make_unique<MyBmNode>(params, std::move(document_types)); +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h new file mode 100644 index 00000000000..1212aeb4b5a --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h @@ -0,0 +1,56 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> +#include <vespa/document/config/config-documenttypes.h> +#include <vespa/searchcore/proton/common/doctypename.h> + +namespace document { + +class DocumentTypeRepo; +class DocumentType; +class Field; + +}; + +namespace storage::spi { struct PersistenceProvider; } + +namespace search::bmcluster { + +class BmCluster; +class BmClusterParams; +class IBmFeedHandler; +class SpiBmFeedHandler; + +/* + * Class representing a single benchmark node in a benchmark cluster. + */ +class BmNode { +protected: + std::shared_ptr<document::DocumenttypesConfig> _document_types; + std::shared_ptr<const document::DocumentTypeRepo> _repo; + proton::DocTypeName _doc_type_name; + const document::DocumentType* _document_type; + const document::Field& _field; + + BmNode(std::shared_ptr<document::DocumenttypesConfig> document_types); +public: + virtual ~BmNode(); + virtual std::unique_ptr<SpiBmFeedHandler> make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) = 0; + virtual void start_service_layer(const BmClusterParams& params) = 0; + virtual void wait_service_layer() = 0; + virtual void start_distributor(const BmClusterParams& params) = 0; + virtual void create_feed_handler(const BmClusterParams& params, BmCluster& cluster) = 0; + virtual void shutdown_feed_handler() = 0; + virtual void shutdown_distributor() = 0; + virtual void shutdown_service_layer() = 0; + virtual IBmFeedHandler* get_feed_handler() = 0; + virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0; + static std::unique_ptr<BmNode> create(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types); + const proton::DocTypeName& get_doc_type_name() const noexcept { return _doc_type_name; } + const document::DocumentType *get_document_type() const noexcept { return _document_type; } + const document::Field& get_field() const noexcept { return _field; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_range.h b/searchcore/src/vespa/searchcore/bmcluster/bm_range.h new file mode 100644 index 00000000000..c4834f8c17b --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_range.h @@ -0,0 +1,24 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace search::bmcluster { + +/* + * Range of document "keys" used to generate documents + */ +class BmRange +{ + uint32_t _start; + uint32_t _end; +public: + BmRange(uint32_t start_in, uint32_t end_in) + : _start(start_in), + _end(end_in) + { + } + uint32_t get_start() const { return _start; } + uint32_t get_end() const { return _end; } +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.cpp index bbe0de70ce2..16883e1cc48 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.cpp @@ -7,7 +7,7 @@ #include <vespa/log/log.h> LOG_SETUP(".bm_storage_chain_builder"); -namespace feedbm { +namespace search::bmcluster { BmStorageChainBuilder::BmStorageChainBuilder() : storage::StorageChainBuilder(), diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.h index bba933da9e0..c61cb200c36 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.h @@ -4,7 +4,7 @@ #include <vespa/storage/common/storage_chain_builder.h> -namespace feedbm { +namespace search::bmcluster { struct BmStorageLinkContext; diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.cpp index 2aeda91c30c..c251c25b15d 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.cpp @@ -3,7 +3,7 @@ #include "bm_storage_link.h" #include "pending_tracker.h" -namespace feedbm { +namespace search::bmcluster { BmStorageLink::BmStorageLink() diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.h index 95528d7b2d9..8c98479a38b 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.h @@ -6,7 +6,7 @@ #include "pending_tracker_hash.h" #include <vespa/storage/common/storagelink.h> -namespace feedbm { +namespace search::bmcluster { class PendingTracker; diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link_context.h index f2df20f1f66..f7cc1841770 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link_context.h @@ -1,6 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -namespace feedbm { +namespace search::bmcluster { class BmStorageLink; diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp index fc43402d68e..6670707ed39 100644 --- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp @@ -3,7 +3,7 @@ #include "bucket_info_queue.h" #include <vespa/persistence/spi/persistenceprovider.h> -namespace feedbm { +namespace search::bmcluster { BucketInfoQueue::BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors) : _mutex(), diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h index 07a55127234..1a48f9fa478 100644 --- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h @@ -9,7 +9,7 @@ namespace storage::spi { struct PersistenceProvider; } -namespace feedbm { +namespace search::bmcluster { /* * Class containing a queue of buckets where mutating feed operations diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h b/searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h new file mode 100644 index 00000000000..7401a12cba0 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h @@ -0,0 +1,28 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace search::bmcluster { + +/* + * Map from document index to bucket to ensure even spread between buckets + * while ensuring that each bucket used belong to a specific thread. + */ +class BucketSelector +{ + uint32_t _thread_id; + uint32_t _threads; + uint32_t _num_buckets; +public: + BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in) + : _thread_id(thread_id_in), + _threads(threads_in), + _num_buckets((num_buckets_in / _threads) * _threads) + { + } + uint64_t operator()(uint32_t i) const { + return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; + } +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp index 38c8490de69..c6f2626f27c 100644 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp @@ -17,7 +17,7 @@ using document::DocumentUpdate; using storage::api::StorageMessageAddress; using storage::lib::NodeType; -namespace feedbm { +namespace search::bmcluster { namespace { vespalib::string _Storage("storage"); diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h index c71bb113c5b..5358e0a948b 100644 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h @@ -9,7 +9,7 @@ namespace document { class DocumentTypeRepo; } namespace documentapi { class DocumentMessage; }; namespace storage::api { class StorageMessageAddress; } -namespace feedbm { +namespace search::bmcluster { class BmMessageBus; diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/i_bm_feed_handler.h index 26cbf27b455..fc3953c49a5 100644 --- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/i_bm_feed_handler.h @@ -12,7 +12,7 @@ class DocumentUpdate; class DocumentId; } -namespace feedbm { +namespace search::bmcluster { class BucketInfoQueue; class PendingTracker; diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp index 94bed4cb3bd..247bf8bece3 100644 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp @@ -7,7 +7,7 @@ using namespace std::chrono_literals; -namespace feedbm { +namespace search::bmcluster { PendingTracker::PendingTracker(uint32_t limit) : _pending(0u), diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h index 4ca84ab7442..a8fa2f77396 100644 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h +++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h @@ -7,7 +7,7 @@ namespace storage::spi { struct PersistenceProvider; } -namespace feedbm { +namespace search::bmcluster { class BucketInfoQueue; diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.cpp index 6863d35703e..515f7f6b2de 100644 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.cpp @@ -5,7 +5,7 @@ #include <vespa/vespalib/stllike/hash_map.hpp> #include <cassert> -namespace feedbm { +namespace search::bmcluster { PendingTrackerHash::PendingTrackerHash() : _mutex(), diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.h index 89be93fd4ed..de9b6f63aa4 100644 --- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h +++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.h @@ -5,7 +5,7 @@ #include <vespa/vespalib/stllike/hash_map.h> #include <mutex> -namespace feedbm { +namespace search::bmcluster { class PendingTracker; diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp index 11149eecb3f..2aa7ee5ec47 100644 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp @@ -15,7 +15,7 @@ using storage::spi::Bucket; using storage::spi::PersistenceProvider; using storage::spi::Timestamp; -namespace feedbm { +namespace search::bmcluster { namespace { diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h index a78aa06628b..3a24a5b38b7 100644 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h @@ -8,7 +8,7 @@ namespace document { class FieldSetRepo; } namespace storage::spi { struct PersistenceProvider; } -namespace feedbm { +namespace search::bmcluster { /* * Benchmark feed handler for feed directly to persistence provider diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp index 82cf2df065f..34669b8cbdc 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp @@ -18,7 +18,7 @@ using document::Document; using document::DocumentId; using document::DocumentUpdate; -namespace feedbm { +namespace search::bmcluster { namespace { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h index 0c4b715122e..1c196d746eb 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h @@ -6,7 +6,7 @@ namespace storage::api { class StorageCommand; } -namespace feedbm { +namespace search::bmcluster { struct BmStorageLinkContext; diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp index f63a8e33cc0..04561b5d93e 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp @@ -15,7 +15,7 @@ using document::DocumentUpdate; using storage::api::StorageMessageAddress; using storage::lib::NodeType; -namespace feedbm { +namespace search::bmcluster { namespace { vespalib::string _Storage("storage"); diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h index 2aafd0c6830..0027f260b8f 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h @@ -12,7 +12,7 @@ class StorageCommand; class StorageMessageAddress; } -namespace feedbm { +namespace search::bmcluster { class BmMessageBus; diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp index 04d49bba0a3..3e0426cb308 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp @@ -22,7 +22,7 @@ using storage::rpc::SharedRpcResources; using storage::rpc::StorageApiRpcService; using storage::lib::NodeType; -namespace feedbm { +namespace search::bmcluster { namespace { vespalib::string _Storage("storage"); diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h index 5057d8889a5..360f702e590 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h @@ -16,7 +16,7 @@ class MessageCodecProvider; class SharedRpcResources; } -namespace feedbm { +namespace search::bmcluster { /* * Benchmark feed handler for feed to service layer or distributor diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.cpp index 260b0c8a7af..ec1ebec2954 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.cpp @@ -6,7 +6,7 @@ #include <vespa/log/log.h> LOG_SETUP(".storage_reply_error_checker"); -namespace feedbm { +namespace search::bmcluster { StorageReplyErrorChecker::StorageReplyErrorChecker() : _errors(0u) diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h index 4743367b426..2fcb6aad14a 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h @@ -6,7 +6,7 @@ namespace storage::api { class StorageMessage; } -namespace feedbm { +namespace search::bmcluster { class StorageReplyErrorChecker { protected: |