aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-10 11:30:08 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-10 11:30:08 +0200
commitc8beeb61f595019c22ee6eca137ab47625c5712a (patch)
treeba034cbf4fcee3b077dfad266e24460c9c451938 /searchcore
parentbb4ee4e9c053ca4f341eaa5490a850e13ea37f5c (diff)
Start moving portions of vespa-feed-bm app to searchcore_bmcluster library.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt34
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp1073
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt42
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp180
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h55
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp47
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h67
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp195
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feed.h57
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_message_bus.h)4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp675
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.h56
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_range.h24
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_link.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_link_context.h (renamed from searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp (renamed from searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h (renamed from searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h28
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/i_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.cpp (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.h (renamed from searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.cpp (renamed from searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp)2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h (renamed from searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h)2
42 files changed, 1556 insertions, 1036 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index 2d5eb8dbc4f..c76f35bd9ff 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -20,6 +20,7 @@ vespa_define_module(
fileacquirer
LIBS
+ src/vespa/searchcore/bmcluster
src/vespa/searchcore/config
src/vespa/searchcore/grouping
src/vespa/searchcore/proton/attribute
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index fe83c89d83a..daefef5d413 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -2,39 +2,7 @@
vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
vespa_feed_bm.cpp
- bm_cluster_controller.cpp
- bm_message_bus.cpp
- bm_storage_chain_builder.cpp
- bm_storage_link.cpp
- bucket_info_queue.cpp
- document_api_message_bus_bm_feed_handler.cpp
- pending_tracker.cpp
- pending_tracker_hash.cpp
- spi_bm_feed_handler.cpp
- storage_api_chain_bm_feed_handler.cpp
- storage_api_message_bus_bm_feed_handler.cpp
- storage_api_rpc_bm_feed_handler.cpp
- storage_reply_error_checker.cpp
OUTPUT_NAME vespa-feed-bm
DEPENDS
- searchcore_server
- searchcore_initializer
- searchcore_reprocessing
- searchcore_index
- searchcore_persistenceengine
- searchcore_docsummary
- searchcore_feedoperation
- searchcore_matching
- searchcore_attribute
- searchcore_documentmetastore
- searchcore_bucketdb
- searchcore_flushengine
- searchcore_pcommon
- searchcore_grouping
- searchcore_proton_metrics
- searchcore_fconfig
- storageserver_storageapp
- messagebus_messagebus-test
- messagebus
- searchlib_searchlib_uca
+ searchcore_bmcluster
)
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index cd1920d237f..9752a4b5c36 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -1,80 +1,25 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "bm_cluster_controller.h"
-#include "bm_message_bus.h"
-#include "bm_storage_chain_builder.h"
-#include "bm_storage_link_context.h"
-#include "pending_tracker.h"
-#include "spi_bm_feed_handler.h"
-#include "storage_api_chain_bm_feed_handler.h"
-#include "storage_api_message_bus_bm_feed_handler.h"
-#include "storage_api_rpc_bm_feed_handler.h"
-#include "document_api_message_bus_bm_feed_handler.h"
-#include <tests/proton/common/dummydbowner.h>
-#include <vespa/config-attributes.h>
-#include <vespa/config-bucketspaces.h>
-#include <vespa/config-imported-fields.h>
-#include <vespa/config-indexschema.h>
-#include <vespa/config-persistence.h>
-#include <vespa/config-rank-profiles.h>
-#include <vespa/config-slobroks.h>
-#include <vespa/config-stor-distribution.h>
-#include <vespa/config-stor-filestor.h>
-#include <vespa/config-summary.h>
-#include <vespa/config-summarymap.h>
-#include <vespa/config-upgrading.h>
-#include <vespa/config/common/configcontext.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/document_type_repo_factory.h>
#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/document/update/assignvalueupdate.h>
-#include <vespa/document/update/documentupdate.h>
#include <vespa/fastos/app.h>
-#include <vespa/messagebus/config-messagebus.h>
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/metrics/config-metricsmanager.h>
-#include <vespa/searchcommon/common/schemaconfigurer.h>
-#include <vespa/searchcore/proton/common/alloc_config.h>
-#include <vespa/searchcore/proton/common/hw_info.h>
-#include <vespa/searchcore/proton/matching/querylimiter.h>
-#include <vespa/searchcore/proton/metrics/metricswireservice.h>
-#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
-#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
-#include <vespa/searchcore/proton/server/bootstrapconfig.h>
-#include <vespa/searchcore/proton/server/document_db_maintenance_config.h>
-#include <vespa/searchcore/proton/server/documentdb.h>
-#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
-#include <vespa/searchcore/proton/server/fileconfigmanager.h>
-#include <vespa/searchcore/proton/server/memoryconfigstore.h>
-#include <vespa/searchcore/proton/server/persistencehandlerproxy.h>
-#include <vespa/searchcore/proton/server/threading_service_config.h>
-#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/searchcore/bmcluster/bm_cluster.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_controller.h>
+#include <vespa/searchcore/bmcluster/bm_cluster_params.h>
+#include <vespa/searchcore/bmcluster/bm_feed.h>
+#include <vespa/searchcore/bmcluster/bm_node.h>
+#include <vespa/searchcore/bmcluster/bm_range.h>
+#include <vespa/searchcore/bmcluster/bucket_selector.h>
+#include <vespa/searchcore/bmcluster/spi_bm_feed_handler.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
-#include <vespa/searchlib/transactionlog/translogserver.h>
-#include <vespa/searchsummary/config/config-juniperrc.h>
-#include <vespa/slobrok/sbmirror.h>
-#include <vespa/storage/bucketdb/config-stor-bucket-init.h>
-#include <vespa/storage/common/i_storage_chain_builder.h>
-#include <vespa/storage/config/config-stor-bouncer.h>
-#include <vespa/storage/config/config-stor-communicationmanager.h>
-#include <vespa/storage/config/config-stor-distributormanager.h>
-#include <vespa/storage/config/config-stor-opslogger.h>
-#include <vespa/storage/config/config-stor-prioritymapping.h>
-#include <vespa/storage/config/config-stor-server.h>
-#include <vespa/storage/config/config-stor-status.h>
-#include <vespa/storage/config/config-stor-visitordispatcher.h>
-#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
-#include <vespa/storage/visiting/config-stor-visitor.h>
-#include <vespa/storageserver/app/distributorprocess.h>
-#include <vespa/storageserver/app/servicelayerprocess.h>
#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <getopt.h>
#include <iostream>
#include <thread>
@@ -82,82 +27,30 @@
#include <vespa/log/log.h>
LOG_SETUP("vespa-feed-bm");
-using namespace cloud::config::filedistribution;
-using namespace config;
using namespace proton;
using namespace std::chrono_literals;
-using namespace vespa::config::search::core;
-using namespace vespa::config::search::summary;
-using namespace vespa::config::search;
-using vespa::config::content::PersistenceConfigBuilder;
-using vespa::config::content::StorDistributionConfigBuilder;
-using vespa::config::content::StorFilestorConfigBuilder;
-using vespa::config::content::UpgradingConfigBuilder;
-using vespa::config::content::core::BucketspacesConfig;
-using vespa::config::content::core::BucketspacesConfigBuilder;
-using vespa::config::content::core::StorBouncerConfigBuilder;
-using vespa::config::content::core::StorBucketInitConfigBuilder;
-using vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
-using vespa::config::content::core::StorDistributormanagerConfigBuilder;
-using vespa::config::content::core::StorOpsloggerConfigBuilder;
-using vespa::config::content::core::StorPrioritymappingConfigBuilder;
-using vespa::config::content::core::StorServerConfigBuilder;
-using vespa::config::content::core::StorStatusConfigBuilder;
-using vespa::config::content::core::StorVisitorConfigBuilder;
-using vespa::config::content::core::StorVisitordispatcherConfigBuilder;
-using cloud::config::SlobroksConfigBuilder;
-using messagebus::MessagebusConfigBuilder;
-using metrics::MetricsmanagerConfigBuilder;
-using config::ConfigContext;
-using config::ConfigSet;
-using config::ConfigUri;
-using config::IConfigContext;
-using document::AssignValueUpdate;
-using document::BucketId;
-using document::BucketSpace;
-using document::Document;
-using document::DocumentId;
-using document::DocumentType;
using document::DocumentTypeRepo;
using document::DocumentTypeRepoFactory;
-using document::DocumentUpdate;
using document::DocumenttypesConfig;
using document::DocumenttypesConfigBuilder;
-using document::Field;
-using document::FieldSetRepo;
-using document::FieldUpdate;
-using document::IntFieldValue;
-using document::test::makeBucketSpace;
-using feedbm::BmClusterController;
-using feedbm::BmMessageBus;
-using feedbm::BmStorageChainBuilder;
-using feedbm::BmStorageLinkContext;
-using feedbm::IBmFeedHandler;
-using feedbm::DocumentApiMessageBusBmFeedHandler;
-using feedbm::SpiBmFeedHandler;
-using feedbm::StorageApiChainBmFeedHandler;
-using feedbm::StorageApiMessageBusBmFeedHandler;
-using feedbm::StorageApiRpcBmFeedHandler;
-using search::TuneFileDocumentDB;
+using search::bmcluster::BmClusterController;
+using search::bmcluster::IBmFeedHandler;
+using search::bmcluster::BmClusterParams;
+using search::bmcluster::BmCluster;
+using search::bmcluster::BmFeed;
+using search::bmcluster::BmNode;
+using search::bmcluster::BmRange;
+using search::bmcluster::BucketSelector;
using search::index::DummyFileHeaderContext;
-using search::index::Schema;
-using search::index::SchemaBuilder;
-using search::transactionlog::TransLogServer;
-using storage::rpc::SharedRpcResources;
-using storage::rpc::StorageApiRpcService;
using storage::spi::PersistenceProvider;
-using vespalib::compression::CompressionConfig;
using vespalib::makeLambdaTask;
-using proton::ThreadingServiceConfig;
-
-using DocumentDBMap = std::map<DocTypeName, std::shared_ptr<DocumentDB>>;
namespace {
vespalib::string base_dir = "testdb";
-std::shared_ptr<DocumenttypesConfig> make_document_type() {
+std::shared_ptr<DocumenttypesConfig> make_document_types() {
using Struct = document::config_builder::Struct;
using DataType = document::DataType;
document::config_builder::DocumenttypesConfigBuilderHelper builder;
@@ -165,130 +58,14 @@ std::shared_ptr<DocumenttypesConfig> make_document_type() {
return std::make_shared<DocumenttypesConfig>(builder.config());
}
-std::shared_ptr<AttributesConfig> make_attributes_config() {
- AttributesConfigBuilder builder;
- AttributesConfig::Attribute attribute;
- attribute.name = "int";
- attribute.datatype = AttributesConfig::Attribute::Datatype::INT32;
- builder.attribute.emplace_back(attribute);
- return std::make_shared<AttributesConfig>(builder);
-}
-
-std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const DocumentTypeRepo> repo, const DocTypeName& doc_type_name)
-{
- auto indexschema = std::make_shared<IndexschemaConfig>();
- auto attributes = make_attributes_config();
- auto summary = std::make_shared<SummaryConfig>();
- std::shared_ptr<Schema> schema(new Schema());
- SchemaBuilder::build(*indexschema, *schema);
- SchemaBuilder::build(*attributes, *schema);
- SchemaBuilder::build(*summary, *schema);
- return std::make_shared<DocumentDBConfig>(
- 1,
- std::make_shared<RankProfilesConfig>(),
- std::make_shared<matching::RankingConstants>(),
- std::make_shared<matching::RankingExpressions>(),
- std::make_shared<matching::OnnxModels>(),
- indexschema,
- attributes,
- summary,
- std::make_shared<SummarymapConfig>(),
- std::make_shared<JuniperrcConfig>(),
- document_types,
- repo,
- std::make_shared<ImportedFieldsConfig>(),
- std::make_shared<TuneFileDocumentDB>(),
- schema,
- std::make_shared<DocumentDBMaintenanceConfig>(),
- search::LogDocumentStore::Config(),
- std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)),
- std::make_shared<const AllocConfig>(),
- "client",
- doc_type_name.getName());
-}
-
-void
-make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
-{
- SlobroksConfigBuilder::Slobrok slobrok;
- slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
- slobroks.slobrok.push_back(std::move(slobrok));
-}
-
-void
-make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces)
-{
- BucketspacesConfigBuilder::Documenttype bucket_space_map;
- bucket_space_map.name = "test";
- bucket_space_map.bucketspace = "default";
- bucketspaces.documenttype.emplace_back(std::move(bucket_space_map));
-}
-
-class MyPersistenceEngineOwner : public IPersistenceEngineOwner
-{
- void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { }
-};
-
-struct MyResourceWriteFilter : public IResourceWriteFilter
-{
- bool acceptWriteOperation() const override { return true; }
- State getAcceptState() const override { return IResourceWriteFilter::State(); }
-};
-
-class BucketSelector
-{
- uint32_t _thread_id;
- uint32_t _threads;
- uint32_t _num_buckets;
-public:
- BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in)
- : _thread_id(thread_id_in),
- _threads(threads_in),
- _num_buckets((num_buckets_in / _threads) * _threads)
- {
- }
- uint64_t operator()(uint32_t i) const {
- return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets;
- }
-};
-
-class BMRange
-{
- uint32_t _start;
- uint32_t _end;
-public:
- BMRange(uint32_t start_in, uint32_t end_in)
- : _start(start_in),
- _end(end_in)
- {
- }
- uint32_t get_start() const { return _start; }
- uint32_t get_end() const { return _end; }
-};
-
-class BMParams {
+class BMParams : public BmClusterParams {
uint32_t _documents;
uint32_t _client_threads;
uint32_t _get_passes;
- vespalib::string _indexing_sequencer;
uint32_t _put_passes;
uint32_t _update_passes;
uint32_t _remove_passes;
- uint32_t _rpc_network_threads;
- uint32_t _rpc_events_before_wakeup;
- uint32_t _rpc_targets_per_node;
- uint32_t _response_threads;
uint32_t _max_pending;
- bool _enable_distributor;
- bool _enable_service_layer;
- bool _skip_get_spi_bucket_info;
- bool _use_document_api;
- bool _use_message_bus;
- bool _use_storage_chain;
- bool _use_async_message_handling_on_schedule;
- uint32_t _bucket_db_stripe_bits;
- uint32_t _distributor_stripes;
- bool _skip_communicationmanager_thread;
uint32_t get_start(uint32_t thread_id) const {
return (_documents / _client_threads) * thread_id + std::min(thread_id, _documents % _client_threads);
}
@@ -297,82 +74,39 @@ public:
: _documents(160000),
_client_threads(1),
_get_passes(0),
- _indexing_sequencer(),
_put_passes(2),
_update_passes(1),
_remove_passes(2),
- _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def
- _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def
- _rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def
- _response_threads(2), // Same default as in stor-filestor.def
- _max_pending(1000),
- _enable_distributor(false),
- _enable_service_layer(false),
- _skip_get_spi_bucket_info(false),
- _use_document_api(false),
- _use_message_bus(false),
- _use_storage_chain(false),
- _use_async_message_handling_on_schedule(false),
- _bucket_db_stripe_bits(0),
- _distributor_stripes(0),
- _skip_communicationmanager_thread(false) // Same default as in stor-communicationmanager.def
+ _max_pending(1000)
{
}
- BMRange get_range(uint32_t thread_id) const {
- return BMRange(get_start(thread_id), get_start(thread_id + 1));
+ BmRange get_range(uint32_t thread_id) const {
+ return BmRange(get_start(thread_id), get_start(thread_id + 1));
}
uint32_t get_documents() const { return _documents; }
uint32_t get_max_pending() const { return _max_pending; }
uint32_t get_client_threads() const { return _client_threads; }
uint32_t get_get_passes() const { return _get_passes; }
- const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
uint32_t get_put_passes() const { return _put_passes; }
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
- uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
- uint32_t get_rpc_events_before_wakup() const { return _rpc_events_before_wakeup; }
- uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; }
- uint32_t get_response_threads() const { return _response_threads; }
- bool get_enable_distributor() const { return _enable_distributor; }
- bool get_skip_get_spi_bucket_info() const { return _skip_get_spi_bucket_info; }
- bool get_use_document_api() const { return _use_document_api; }
- bool get_use_message_bus() const { return _use_message_bus; }
- bool get_use_storage_chain() const { return _use_storage_chain; }
- bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; }
- uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
- uint32_t get_distributor_stripes() const { return _distributor_stripes; }
- bool get_skip_communicationmanager_thread() const { return _skip_communicationmanager_thread; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
void set_max_pending(uint32_t max_pending_in) { _max_pending = max_pending_in; }
void set_client_threads(uint32_t threads_in) { _client_threads = threads_in; }
void set_get_passes(uint32_t get_passes_in) { _get_passes = get_passes_in; }
- void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
void set_put_passes(uint32_t put_passes_in) { _put_passes = put_passes_in; }
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
- void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
- void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; }
- void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; }
- void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
- void set_enable_distributor(bool value) { _enable_distributor = value; }
- void set_enable_service_layer(bool value) { _enable_service_layer = value; }
- void set_skip_get_spi_bucket_info(bool value) { _skip_get_spi_bucket_info = value; }
- void set_use_document_api(bool value) { _use_document_api = value; }
- void set_use_message_bus(bool value) { _use_message_bus = value; }
- void set_use_storage_chain(bool value) { _use_storage_chain = value; }
- void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; }
- void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
- void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; }
- void set_skip_communicationmanager_thread(bool value) { _skip_communicationmanager_thread = value; }
bool check() const;
- bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
- bool needs_distributor() const { return _enable_distributor || _use_document_api; }
- bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
+ bool needs_message_bus() const { return get_use_message_bus() || get_use_document_api(); }
};
bool
BMParams::check() const
{
+ if (!BmClusterParams::check()) {
+ return false;
+ }
if (_client_threads < 1) {
std::cerr << "Too few client threads: " << _client_threads << std::endl;
return false;
@@ -389,533 +123,82 @@ BMParams::check() const
std::cerr << "Put passes too low: " << _put_passes << std::endl;
return false;
}
- if (_rpc_network_threads < 1) {
- std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl;
- return false;
- }
- if (_rpc_targets_per_node < 1) {
- std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl;
- return false;
- }
- if (_response_threads < 1) {
- std::cerr << "Too few response threads: " << _response_threads << std::endl;
- return false;
- }
return true;
}
-class MyServiceLayerProcess : public storage::ServiceLayerProcess {
- PersistenceProvider& _provider;
-
-public:
- MyServiceLayerProcess(const config::ConfigUri & configUri,
- PersistenceProvider &provider,
- std::unique_ptr<storage::IStorageChainBuilder> chain_builder);
- ~MyServiceLayerProcess() override { shutdown(); }
-
- void shutdown() override;
- void setupProvider() override;
- PersistenceProvider& getProvider() override;
-};
-
-MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri & configUri,
- PersistenceProvider &provider,
- std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
- : ServiceLayerProcess(configUri),
- _provider(provider)
-{
- if (chain_builder) {
- set_storage_chain_builder(std::move(chain_builder));
- }
-}
-
-void
-MyServiceLayerProcess::shutdown()
-{
- ServiceLayerProcess::shutdown();
-}
-
-void
-MyServiceLayerProcess::setupProvider()
-{
-}
-
-PersistenceProvider&
-MyServiceLayerProcess::getProvider()
-{
- return _provider;
-}
-
-struct MyStorageConfig
-{
- vespalib::string config_id;
- DocumenttypesConfigBuilder documenttypes;
- StorDistributionConfigBuilder stor_distribution;
- StorBouncerConfigBuilder stor_bouncer;
- StorCommunicationmanagerConfigBuilder stor_communicationmanager;
- StorOpsloggerConfigBuilder stor_opslogger;
- StorPrioritymappingConfigBuilder stor_prioritymapping;
- UpgradingConfigBuilder upgrading;
- StorServerConfigBuilder stor_server;
- StorStatusConfigBuilder stor_status;
- BucketspacesConfigBuilder bucketspaces;
- MetricsmanagerConfigBuilder metricsmanager;
- SlobroksConfigBuilder slobroks;
- MessagebusConfigBuilder messagebus;
-
- MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
- int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params)
- : config_id(config_id_in),
- documenttypes(documenttypes_in),
- stor_distribution(),
- stor_bouncer(),
- stor_communicationmanager(),
- stor_opslogger(),
- stor_prioritymapping(),
- upgrading(),
- stor_server(),
- stor_status(),
- bucketspaces(),
- metricsmanager(),
- slobroks(),
- messagebus()
- {
- {
- auto &dc = stor_distribution;
- {
- StorDistributionConfigBuilder::Group group;
- {
- StorDistributionConfigBuilder::Group::Nodes node;
- node.index = 0;
- group.nodes.push_back(std::move(node));
- }
- group.index = "invalid";
- group.name = "invalid";
- group.capacity = 1.0;
- group.partitions = "";
- dc.group.push_back(std::move(group));
- }
- dc.redundancy = 1;
- dc.readyCopies = 1;
- }
- stor_server.isDistributor = distributor;
- stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits();
- if (distributor) {
- stor_server.rootFolder = "distributor";
- } else {
- stor_server.rootFolder = "storage";
- }
- make_slobroks_config(slobroks, slobrok_port);
- stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
- stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakup();
- stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node();
- stor_communicationmanager.mbusport = mbus_port;
- stor_communicationmanager.rpcport = rpc_port;
- stor_communicationmanager.skipThread = params.get_skip_communicationmanager_thread();
-
- stor_status.httpport = status_port;
- make_bucketspaces_config(bucketspaces);
- }
-
- ~MyStorageConfig();
-
- void add_builders(ConfigSet &set) {
- set.addBuilder(config_id, &documenttypes);
- set.addBuilder(config_id, &stor_distribution);
- set.addBuilder(config_id, &stor_bouncer);
- set.addBuilder(config_id, &stor_communicationmanager);
- set.addBuilder(config_id, &stor_opslogger);
- set.addBuilder(config_id, &stor_prioritymapping);
- set.addBuilder(config_id, &upgrading);
- set.addBuilder(config_id, &stor_server);
- set.addBuilder(config_id, &stor_status);
- set.addBuilder(config_id, &bucketspaces);
- set.addBuilder(config_id, &metricsmanager);
- set.addBuilder(config_id, &slobroks);
- set.addBuilder(config_id, &messagebus);
- }
-};
-
-MyStorageConfig::~MyStorageConfig() = default;
-
-struct MyServiceLayerConfig : public MyStorageConfig
-{
- PersistenceConfigBuilder persistence;
- StorFilestorConfigBuilder stor_filestor;
- StorBucketInitConfigBuilder stor_bucket_init;
- StorVisitorConfigBuilder stor_visitor;
-
- MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
- int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params)
- : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
- persistence(),
- stor_filestor(),
- stor_bucket_init(),
- stor_visitor()
- {
- stor_filestor.numResponseThreads = params.get_response_threads();
- stor_filestor.numNetworkThreads = params.get_rpc_network_threads();
- stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule();
- }
-
- ~MyServiceLayerConfig();
-
- void add_builders(ConfigSet &set) {
- MyStorageConfig::add_builders(set);
- set.addBuilder(config_id, &persistence);
- set.addBuilder(config_id, &stor_filestor);
- set.addBuilder(config_id, &stor_bucket_init);
- set.addBuilder(config_id, &stor_visitor);
- }
-};
-
-MyServiceLayerConfig::~MyServiceLayerConfig() = default;
-
-struct MyDistributorConfig : public MyStorageConfig
-{
- StorDistributormanagerConfigBuilder stor_distributormanager;
- StorVisitordispatcherConfigBuilder stor_visitordispatcher;
-
- MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
- int slobrok_port, int mbus_port, int rpc_port, int status_port, const BMParams& params)
- : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
- stor_distributormanager(),
- stor_visitordispatcher()
- {
- stor_distributormanager.numDistributorStripes = params.get_distributor_stripes();
- }
-
- ~MyDistributorConfig();
-
- void add_builders(ConfigSet &set) {
- MyStorageConfig::add_builders(set);
- set.addBuilder(config_id, &stor_distributormanager);
- set.addBuilder(config_id, &stor_visitordispatcher);
- }
-};
-
-MyDistributorConfig::~MyDistributorConfig() = default;
-
-struct MyRpcClientConfig {
- vespalib::string config_id;
- SlobroksConfigBuilder slobroks;
-
- MyRpcClientConfig(const vespalib::string &config_id_in, int slobrok_port)
- : config_id(config_id_in),
- slobroks()
- {
- make_slobroks_config(slobroks, slobrok_port);
- }
- ~MyRpcClientConfig();
-
- void add_builders(ConfigSet &set) {
- set.addBuilder(config_id, &slobroks);
- }
-};
-
-MyRpcClientConfig::~MyRpcClientConfig() = default;
-
-struct MyMessageBusConfig {
- vespalib::string config_id;
- SlobroksConfigBuilder slobroks;
- MessagebusConfigBuilder messagebus;
-
- MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port)
- : config_id(config_id_in),
- slobroks(),
- messagebus()
- {
- make_slobroks_config(slobroks, slobrok_port);
- }
- ~MyMessageBusConfig();
-
- void add_builders(ConfigSet &set) {
- set.addBuilder(config_id, &slobroks);
- set.addBuilder(config_id, &messagebus);
- }
-};
-
-MyMessageBusConfig::~MyMessageBusConfig() = default;
-
}
struct PersistenceProviderFixture {
- std::shared_ptr<DocumenttypesConfig> _document_types;
+ std::shared_ptr<const DocumenttypesConfig> _document_types;
std::shared_ptr<const DocumentTypeRepo> _repo;
- DocTypeName _doc_type_name;
- const DocumentType* _document_type;
- const Field& _field;
- std::shared_ptr<DocumentDBConfig> _document_db_config;
- vespalib::string _base_dir;
- DummyFileHeaderContext _file_header_context;
- int _tls_listen_port;
- int _slobrok_port;
- int _rpc_client_port;
- int _service_layer_mbus_port;
- int _service_layer_rpc_port;
- int _service_layer_status_port;
- int _distributor_mbus_port;
- int _distributor_rpc_port;
- int _distributor_status_port;
- TransLogServer _tls;
- vespalib::string _tls_spec;
- matching::QueryLimiter _query_limiter;
- vespalib::Clock _clock;
- DummyWireService _metrics_wire_service;
- MemoryConfigStores _config_stores;
- vespalib::ThreadStackExecutor _summary_executor;
- DummyDBOwner _document_db_owner;
- BucketSpace _bucket_space;
- std::shared_ptr<DocumentDB> _document_db;
- MyPersistenceEngineOwner _persistence_owner;
- MyResourceWriteFilter _write_filter;
- test::DiskMemUsageNotifier _disk_mem_usage_notifier;
- std::shared_ptr<PersistenceEngine> _persistence_engine;
- std::unique_ptr<const FieldSetRepo> _field_set_repo;
- uint32_t _bucket_bits;
- MyServiceLayerConfig _service_layer_config;
- MyDistributorConfig _distributor_config;
- MyRpcClientConfig _rpc_client_config;
- MyMessageBusConfig _message_bus_config;
- ConfigSet _config_set;
- std::shared_ptr<IConfigContext> _config_context;
- std::unique_ptr<IBmFeedHandler> _feed_handler;
- std::unique_ptr<mbus::Slobrok> _slobrok;
- std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context;
- std::unique_ptr<MyServiceLayerProcess> _service_layer;
- std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
- std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
- std::unique_ptr<storage::DistributorProcess> _distributor;
- std::unique_ptr<BmMessageBus> _message_bus;
+ std::unique_ptr<BmCluster> _bm_cluster;
+ std::unique_ptr<BmNode> _bm_node;
+ BmFeed _feed;
+ IBmFeedHandler* _feed_handler;
explicit PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
- void create_document_db(const BMParams & params);
- uint32_t num_buckets() const { return (1u << _bucket_bits); }
- BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); }
- document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); }
- DocumentId make_document_id(uint32_t n, uint32_t i) const;
- std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const;
- std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const;
void create_buckets();
void wait_slobrok(const vespalib::string &name);
- void start_service_layer(const BMParams& params);
- void start_distributor(const BMParams& params);
+ void start_service_layer(const BmClusterParams& params);
+ void start_distributor(const BmClusterParams& params);
void start_message_bus();
- void create_feed_handler(const BMParams& params);
+ void create_feed_handler(const BmClusterParams& params);
void shutdown_feed_handler();
void shutdown_message_bus();
void shutdown_distributor();
void shutdown_service_layer();
+ PersistenceProvider* get_persistence_provider() { return _bm_node->get_persistence_provider(); }
};
PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
- : _document_types(make_document_type()),
- _repo(DocumentTypeRepoFactory::make(*_document_types)),
- _doc_type_name("test"),
- _document_type(_repo->getDocumentType(_doc_type_name.getName())),
- _field(_document_type->getField("int")),
- _document_db_config(make_document_db_config(_document_types, _repo, _doc_type_name)),
- _base_dir(base_dir),
- _file_header_context(),
- _tls_listen_port(9017),
- _slobrok_port(9018),
- _rpc_client_port(9019),
- _service_layer_mbus_port(9020),
- _service_layer_rpc_port(9021),
- _service_layer_status_port(9022),
- _distributor_mbus_port(9023),
- _distributor_rpc_port(9024),
- _distributor_status_port(9025),
- _tls("tls", _tls_listen_port, _base_dir, _file_header_context),
- _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)),
- _query_limiter(),
- _clock(),
- _metrics_wire_service(),
- _config_stores(),
- _summary_executor(8, 128_Ki),
- _document_db_owner(),
- _bucket_space(makeBucketSpace(_doc_type_name.getName())),
- _document_db(),
- _persistence_owner(),
- _write_filter(),
- _disk_mem_usage_notifier(),
- _persistence_engine(),
- _field_set_repo(std::make_unique<const FieldSetRepo>(*_repo)),
- _bucket_bits(16),
- _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
- _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
- _rpc_client_config("bm-rpc-client", _slobrok_port),
- _message_bus_config("bm-message-bus", _slobrok_port),
- _config_set(),
- _config_context(std::make_shared<ConfigContext>(_config_set)),
- _feed_handler(),
- _slobrok(),
- _service_layer_chain_context(),
- _service_layer(),
- _rpc_client_shared_rpc_resources(),
- _distributor_chain_context(),
- _distributor(),
- _message_bus()
-{
- create_document_db(params);
- _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false);
- auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db);
- _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
- _service_layer_config.add_builders(_config_set);
- _distributor_config.add_builders(_config_set);
- _rpc_client_config.add_builders(_config_set);
- _message_bus_config.add_builders(_config_set);
- _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info());
-}
-
-PersistenceProviderFixture::~PersistenceProviderFixture()
-{
- if (_persistence_engine) {
- _persistence_engine->destroyIterators();
- _persistence_engine->removeHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name);
- }
- if (_document_db) {
- _document_db->close();
- }
-}
-
-void
-PersistenceProviderFixture::create_document_db(const BMParams & params)
-{
- vespalib::mkdir(_base_dir, false);
- vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false);
- vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig";
- {
- FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName());
- fileCfg.saveConfig(*_document_db_config, 1);
- }
- config::DirSpec spec(input_cfg + "/config-1");
- auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>();
- DocumentDBConfigHelper mgr(spec, _doc_type_name.getName());
- auto protonCfg = std::make_shared<ProtonConfigBuilder>();
- if ( ! params.get_indexing_sequencer().empty()) {
- vespalib::string sequencer = params.get_indexing_sequencer();
- std::transform(sequencer.begin(), sequencer.end(), sequencer.begin(), [](unsigned char c){ return std::toupper(c); });
- protonCfg->indexing.optimize = ProtonConfig::Indexing::getOptimize(sequencer);
- }
- auto bootstrap_config = std::make_shared<BootstrapConfig>(1,
- _document_types,
- _repo,
- std::move(protonCfg),
- std::make_shared<FiledistributorrpcConfig>(),
- std::make_shared<BucketspacesConfig>(),
- tuneFileDocDB, HwInfo());
- mgr.forwardConfig(bootstrap_config);
- mgr.nextGeneration(0ms);
- _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name,
- _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner,
- _summary_executor, _summary_executor, *_persistence_engine, _tls,
- _metrics_wire_service, _file_header_context,
- _config_stores.getConfigStore(_doc_type_name.toString()),
- std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo());
- _document_db->start();
- _document_db->waitForOnlineState();
-}
-
-DocumentId
-PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const
+ : _document_types(make_document_types()),
+ _repo(document::DocumentTypeRepoFactory::make(*_document_types)),
+ _bm_cluster(std::make_unique<BmCluster>(params, _repo)),
+ _bm_node(BmNode::create(params, _document_types)),
+ _feed(_repo),
+ _feed_handler(nullptr)
{
- DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i));
- return id;
}
-std::unique_ptr<Document>
-PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const
-{
- auto id = make_document_id(n, i);
- auto document = std::make_unique<Document>(*_document_type, id);
- document->setRepo(*_repo);
- document->setFieldValue(_field, std::make_unique<IntFieldValue>(i));
- return document;
-}
-
-std::unique_ptr<DocumentUpdate>
-PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const
-{
- auto id = make_document_id(n, i);
- auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id);
- document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15))));
- return document_update;
-}
+PersistenceProviderFixture::~PersistenceProviderFixture() = default;
void
PersistenceProviderFixture::create_buckets()
{
- SpiBmFeedHandler feed_handler(*_persistence_engine, *_field_set_repo, false);
- for (unsigned int i = 0; i < num_buckets(); ++i) {
- feed_handler.create_bucket(make_bucket(i));
+ auto feed_handler = _bm_node->make_create_bucket_feed_handler(false);
+ for (unsigned int i = 0; i < _feed.num_buckets(); ++i) {
+ feed_handler->create_bucket(_feed.make_bucket(i));
}
}
void
PersistenceProviderFixture::wait_slobrok(const vespalib::string &name)
{
- auto &mirror = _rpc_client_shared_rpc_resources->slobrok_mirror();
- LOG(info, "Waiting for %s in slobrok", name.c_str());
- for (;;) {
- auto specs = mirror.lookup(name);
- if (!specs.empty()) {
- LOG(info, "Found %s in slobrok", name.c_str());
- return;
- }
- std::this_thread::sleep_for(100ms);
- }
+ _bm_cluster->wait_slobrok(name);
}
void
-PersistenceProviderFixture::start_service_layer(const BMParams& params)
+PersistenceProviderFixture::start_service_layer(const BmClusterParams& params)
{
- LOG(info, "start slobrok");
- _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
- LOG(info, "start service layer");
- config::ConfigUri config_uri("bm-servicelayer", _config_context);
- std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain() && !params.needs_distributor()) {
- chain_builder = std::make_unique<BmStorageChainBuilder>();
- _service_layer_chain_context = chain_builder->get_context();
- }
- _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
- *_persistence_engine,
- std::move(chain_builder));
- _service_layer->setupConfig(100ms);
- _service_layer->createNode();
- _service_layer->getNode().waitUntilInitialized();
- LOG(info, "start rpc client shared resources");
- config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
- _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>
- (client_config_uri, _rpc_client_port, 100, params.get_rpc_events_before_wakup());
- _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
+ _bm_cluster->start_slobrok();
+ _bm_node->start_service_layer(params);
+ _bm_node->wait_service_layer();
+ _bm_cluster->start_rpc_client();
wait_slobrok("storage/cluster.storage/storage/0/default");
wait_slobrok("storage/cluster.storage/storage/0");
- BmClusterController fake_controller(*_rpc_client_shared_rpc_resources);
+ BmClusterController fake_controller(_bm_cluster->get_rpc_client());
fake_controller.set_cluster_up(false);
}
void
-PersistenceProviderFixture::start_distributor(const BMParams& params)
+PersistenceProviderFixture::start_distributor(const BmClusterParams& params)
{
- config::ConfigUri config_uri("bm-distributor", _config_context);
- std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain() && !params.get_use_document_api()) {
- chain_builder = std::make_unique<BmStorageChainBuilder>();
- _distributor_chain_context = chain_builder->get_context();
- }
- _distributor = std::make_unique<storage::DistributorProcess>(config_uri);
- if (chain_builder) {
- _distributor->set_storage_chain_builder(std::move(chain_builder));
- }
- _distributor->setupConfig(100ms);
- _distributor->createNode();
+ _bm_node->start_distributor(params);
wait_slobrok("storage/cluster.storage/distributor/0/default");
wait_slobrok("storage/cluster.storage/distributor/0");
- BmClusterController fake_controller(*_rpc_client_shared_rpc_resources);
+ BmClusterController fake_controller(_bm_cluster->get_rpc_client());
fake_controller.set_cluster_up(true);
// Wait for bucket ownership transfer safe time
std::this_thread::sleep_for(2s);
@@ -924,102 +207,45 @@ PersistenceProviderFixture::start_distributor(const BMParams& params)
void
PersistenceProviderFixture::start_message_bus()
{
- config::ConfigUri config_uri("bm-message-bus", _config_context);
- LOG(info, "Starting message bus");
- _message_bus = std::make_unique<BmMessageBus>(config_uri, _repo);
- LOG(info, "Started message bus");
+ _bm_cluster->start_message_bus();
}
void
-PersistenceProviderFixture::create_feed_handler(const BMParams& params)
+PersistenceProviderFixture::create_feed_handler(const BmClusterParams& params)
{
- StorageApiRpcService::Params rpc_params;
- // This is the same compression config as the default in stor-communicationmanager.def.
- rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
- rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node();
- if (params.get_use_document_api()) {
- _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus);
- } else if (params.get_enable_distributor()) {
- if (params.get_use_storage_chain()) {
- assert(_distributor_chain_context);
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
- } else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true);
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true);
- }
- } else if (params.needs_service_layer()) {
- if (params.get_use_storage_chain()) {
- assert(_service_layer_chain_context);
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
- } else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false);
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false);
- }
- }
+ _bm_node->create_feed_handler(params, *_bm_cluster);
+ _feed_handler = _bm_node->get_feed_handler();
}
void
PersistenceProviderFixture::shutdown_feed_handler()
{
- _feed_handler.reset();
+ _bm_node->shutdown_feed_handler();
+ _feed_handler = nullptr;
}
void
PersistenceProviderFixture::shutdown_message_bus()
{
- if (_message_bus) {
- LOG(info, "stop message bus");
- _message_bus.reset();
- }
+ _bm_cluster->stop_message_bus();
}
void
PersistenceProviderFixture::shutdown_distributor()
{
- if (_distributor) {
- LOG(info, "stop distributor");
- _distributor->getNode().requestShutdown("controlled shutdown");
- _distributor->shutdown();
- }
+ _bm_node->shutdown_distributor();
}
void
PersistenceProviderFixture::shutdown_service_layer()
{
- if (_rpc_client_shared_rpc_resources) {
- LOG(info, "stop rpc client shared resources");
- _rpc_client_shared_rpc_resources->shutdown();
- _rpc_client_shared_rpc_resources.reset();
- }
- if (_service_layer) {
- LOG(info, "stop service layer");
- _service_layer->getNode().requestShutdown("controlled shutdown");
- _service_layer->shutdown();
- }
- if (_slobrok) {
- LOG(info, "stop slobrok");
- _slobrok.reset();
- }
-}
-
-vespalib::nbostream
-make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
-{
- vespalib::nbostream serialized_feed;
- LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end());
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto n = bucket_selector(i);
- serialized_feed << f.make_bucket_id(n);
- auto document = f.make_document(n, i);
- document->serialize(serialized_feed);
- }
- return serialized_feed;
+ _bm_cluster->stop_rpc_client();
+ _bm_node->shutdown_service_layer();
+ _bm_cluster->stop_slobrok();
}
std::vector<vespalib::nbostream>
-make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
+make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BmRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label)
{
LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents());
std::vector<vespalib::nbostream> serialized_feed_v;
@@ -1038,27 +264,6 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st
return serialized_feed_v;
}
-void
-put_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- f._feed_handler->attach_bucket_info_queue(pending_tracker);
- auto &repo = *f._repo;
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- auto bucket_space = f._bucket_space;
- bool use_timestamp = !f._feed_handler->manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- auto document = std::make_unique<Document>(repo, is);
- f._feed_handler->put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
class AvgSampler {
private:
double _total;
@@ -1077,73 +282,42 @@ void
run_put_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { put_async_task(f, max_pending, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { feed.put_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "putAsync: pass=%u, errors=%u, puts/s: %8.2f", pass, new_errors, throughput);
time_bias += bm_params.get_documents();
}
-vespalib::nbostream
-make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
-{
- vespalib::nbostream serialized_feed;
- LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto n = bucket_selector(i);
- serialized_feed << f.make_bucket_id(n);
- auto document_update = f.make_document_update(n, i);
- document_update->serializeHEAD(serialized_feed);
- }
- return serialized_feed;
-}
-
-void
-update_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- f._feed_handler->attach_bucket_info_queue(pending_tracker);
- auto &repo = *f._repo;
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- auto bucket_space = f._bucket_space;
- bool use_timestamp = !f._feed_handler->manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- auto document_update = DocumentUpdate::createHEAD(repo, is);
- f._feed_handler->update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
void
run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { update_async_task(f, max_pending, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { feed.update_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "updateAsync: pass=%u, errors=%u, updates/s: %8.2f", pass, new_errors, throughput);
@@ -1151,94 +325,44 @@ run_update_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecu
}
void
-get_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed)
-{
- LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- vespalib::string all_fields(document::AllFields::NAME);
- auto bucket_space = f._bucket_space;
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- DocumentId document_id(is);
- f._feed_handler->get(bucket, all_fields, document_id, pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
-void
run_get_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
- { get_async_task(f, max_pending, range, serialized_feed); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range]()
+ { feed.get_async_task(feed_handler, max_pending, range, serialized_feed); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "getAsync: pass=%u, errors=%u, gets/s: %8.2f", pass, new_errors, throughput);
}
-vespalib::nbostream
-make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector)
-{
- vespalib::nbostream serialized_feed;
- LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- auto n = bucket_selector(i);
- serialized_feed << f.make_bucket_id(n);
- auto document_id = f.make_document_id(n, i);
- vespalib::string raw_id = document_id.toString();
- serialized_feed.write(raw_id.c_str(), raw_id.size() + 1);
- }
- return serialized_feed;
-}
-
-void
-remove_async_task(PersistenceProviderFixture &f, uint32_t max_pending, BMRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
-{
- LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
- feedbm::PendingTracker pending_tracker(max_pending);
- f._feed_handler->attach_bucket_info_queue(pending_tracker);
- vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- BucketId bucket_id;
- auto bucket_space = f._bucket_space;
- bool use_timestamp = !f._feed_handler->manages_timestamp();
- for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
- is >> bucket_id;
- document::Bucket bucket(bucket_space, bucket_id);
- DocumentId document_id(is);
- f._feed_handler->remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker);
- }
- assert(is.empty());
- pending_tracker.drain();
-}
-
void
run_remove_async_tasks(PersistenceProviderFixture& f, vespalib::ThreadStackExecutor& executor, int pass, int64_t& time_bias,
const std::vector<vespalib::nbostream>& serialized_feed_v, const BMParams& bm_params, AvgSampler& sampler)
{
- uint32_t old_errors = f._feed_handler->get_error_count();
+ auto& feed = f._feed;
+ auto& feed_handler = *f._feed_handler;
+ uint32_t old_errors = feed_handler.get_error_count();
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < bm_params.get_client_threads(); ++i) {
auto range = bm_params.get_range(i);
- executor.execute(makeLambdaTask([&f, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
- { remove_async_task(f, max_pending, range, serialized_feed, time_bias); }));
+ executor.execute(makeLambdaTask([&feed, &feed_handler, max_pending = bm_params.get_max_pending(), &serialized_feed = serialized_feed_v[i], range, time_bias]()
+ { feed.remove_async_task(feed_handler, max_pending, range, serialized_feed, time_bias); }));
}
executor.sync();
auto end_time = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
- uint32_t new_errors = f._feed_handler->get_error_count() - old_errors;
+ uint32_t new_errors = feed_handler.get_error_count() - old_errors;
double throughput = bm_params.get_documents() / elapsed.count();
sampler.sample(throughput);
LOG(info, "removeAsync: pass=%u, errors=%u, removes/s: %8.2f", pass, new_errors, throughput);
@@ -1310,10 +434,10 @@ void benchmark_async_spi(const BMParams &bm_params)
{
vespalib::rmdir(base_dir, true);
PersistenceProviderFixture f(bm_params);
- auto &provider = *f._persistence_engine;
+ auto &provider = *f.get_persistence_provider();
LOG(info, "start initialize");
provider.initialize();
- LOG(info, "create %u buckets", f.num_buckets());
+ LOG(info, "create %u buckets", f._feed.num_buckets());
if (!bm_params.needs_distributor()) {
f.create_buckets();
}
@@ -1328,9 +452,10 @@ void benchmark_async_spi(const BMParams &bm_params)
}
f.create_feed_handler(bm_params);
vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128_Ki);
- auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put");
- auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update");
- auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove");
+ auto& feed = f._feed;
+ auto put_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_put_feed(range, bucket_selector); }, f._feed.num_buckets(), "put");
+ auto update_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_update_feed(range, bucket_selector); }, f._feed.num_buckets(), "update");
+ auto remove_feed = make_feed(executor, bm_params, [&feed](BmRange range, BucketSelector bucket_selector) { return feed.make_remove_feed(range, bucket_selector); }, f._feed.num_buckets(), "remove");
int64_t time_bias = 1;
LOG(info, "Feed handler is '%s'", f._feed_handler->get_name().c_str());
benchmark_async_put(f, executor, time_bias, put_feed, bm_params);
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
new file mode 100644
index 00000000000..d8e31011122
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -0,0 +1,42 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcore_bmcluster STATIC
+ SOURCES
+ bm_cluster.cpp
+ bm_cluster_controller.cpp
+ bm_cluster_params.cpp
+ bm_feed.cpp
+ bm_message_bus.cpp
+ bm_node.cpp
+ bm_storage_chain_builder.cpp
+ bm_storage_link.cpp
+ bucket_info_queue.cpp
+ document_api_message_bus_bm_feed_handler.cpp
+ pending_tracker.cpp
+ pending_tracker_hash.cpp
+ spi_bm_feed_handler.cpp
+ storage_api_chain_bm_feed_handler.cpp
+ storage_api_message_bus_bm_feed_handler.cpp
+ storage_api_rpc_bm_feed_handler.cpp
+ storage_reply_error_checker.cpp
+ DEPENDS
+ searchcore_server
+ searchcore_initializer
+ searchcore_reprocessing
+ searchcore_index
+ searchcore_persistenceengine
+ searchcore_docsummary
+ searchcore_feedoperation
+ searchcore_matching
+ searchcore_attribute
+ searchcore_documentmetastore
+ searchcore_bucketdb
+ searchcore_flushengine
+ searchcore_pcommon
+ searchcore_grouping
+ searchcore_proton_metrics
+ searchcore_fconfig
+ storageserver_storageapp
+ messagebus_messagebus-test
+ messagebus
+ searchlib_searchlib_uca
+)
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
new file mode 100644
index 00000000000..5a0b05f5c54
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
@@ -0,0 +1,180 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_cluster.h"
+#include "bm_message_bus.h"
+#include <vespa/config/common/configcontext.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/messagebus/config-messagebus.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bmcluster.bm_cluster");
+
+using cloud::config::SlobroksConfigBuilder;
+using config::ConfigSet;
+using messagebus::MessagebusConfigBuilder;
+using storage::rpc::SharedRpcResources;
+
+namespace search::bmcluster {
+
+namespace {
+
+vespalib::string message_bus_config_id("bm-message-bus");
+vespalib::string rpc_client_config_id("bm-rpc-client");
+
+void
+make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
+{
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+}
+
+}
+
+struct BmCluster::MessageBusConfigSet {
+ vespalib::string config_id;
+ SlobroksConfigBuilder slobroks;
+ MessagebusConfigBuilder messagebus;
+
+ MessageBusConfigSet(const vespalib::string &config_id_in, int slobrok_port)
+ : config_id(config_id_in),
+ slobroks(),
+ messagebus()
+ {
+ make_slobroks_config(slobroks, slobrok_port);
+ }
+ ~MessageBusConfigSet();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &slobroks);
+ set.addBuilder(config_id, &messagebus);
+ }
+};
+
+BmCluster::MessageBusConfigSet::~MessageBusConfigSet() = default;
+
+struct BmCluster::RpcClientConfigSet {
+ vespalib::string config_id;
+ SlobroksConfigBuilder slobroks;
+
+ RpcClientConfigSet(const vespalib::string &config_id_in, int slobrok_port)
+ : config_id(config_id_in),
+ slobroks()
+ {
+ make_slobroks_config(slobroks, slobrok_port);
+ }
+ ~RpcClientConfigSet();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &slobroks);
+ }
+};
+
+BmCluster::RpcClientConfigSet::~RpcClientConfigSet() = default;
+
+BmCluster::BmCluster(const BmClusterParams& params, std::shared_ptr<const document::DocumentTypeRepo> repo)
+ : _params(params),
+ _slobrok_port(9018),
+ _rpc_client_port(9019),
+ _message_bus_config(std::make_unique<MessageBusConfigSet>(message_bus_config_id, _slobrok_port)),
+ _rpc_client_config(std::make_unique<RpcClientConfigSet>(rpc_client_config_id, _slobrok_port)),
+ _config_set(std::make_unique<config::ConfigSet>()),
+ _config_context(std::make_shared<config::ConfigContext>(*_config_set)),
+ _slobrok(),
+ _message_bus(),
+ _rpc_client(),
+ _repo(repo)
+
+{
+ _message_bus_config->add_builders(*_config_set);
+ _rpc_client_config->add_builders(*_config_set);
+}
+
+BmCluster::~BmCluster()
+{
+ stop_message_bus();
+ stop_rpc_client();
+ stop_slobrok();
+}
+
+
+void
+BmCluster::start_slobrok()
+{
+ if (!_slobrok) {
+ LOG(info, "start slobrok");
+ _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port);
+ }
+}
+
+void
+BmCluster::stop_slobrok()
+{
+ if (_slobrok) {
+ LOG(info, "stop slobrok");
+ _slobrok.reset();
+ }
+}
+
+void
+BmCluster::wait_slobrok(const vespalib::string &name)
+{
+ auto &mirror = _rpc_client->slobrok_mirror();
+ LOG(info, "Waiting for %s in slobrok", name.c_str());
+ for (;;) {
+ auto specs = mirror.lookup(name);
+ if (!specs.empty()) {
+ LOG(info, "Found %s in slobrok", name.c_str());
+ return;
+ }
+ std::this_thread::sleep_for(100ms);
+ }
+}
+
+void
+BmCluster::start_message_bus()
+{
+ if (!_message_bus) {
+ LOG(info, "Starting message bus");
+ config::ConfigUri config_uri(message_bus_config_id, _config_context);
+ _message_bus = std::make_unique<BmMessageBus>(config_uri, _repo);
+ LOG(info, "Started message bus");
+ }
+}
+
+void
+BmCluster::stop_message_bus()
+{
+ if (_message_bus) {
+ LOG(info, "stop message bus");
+ _message_bus.reset();
+ }
+}
+
+void
+BmCluster::start_rpc_client()
+{
+ if (!_rpc_client) {
+ LOG(info, "start rpc client");
+ config::ConfigUri client_config_uri(rpc_client_config_id, _config_context);
+ _rpc_client = std::make_unique<SharedRpcResources>
+ (client_config_uri, _rpc_client_port, 100, _params.get_rpc_events_before_wakeup());
+ _rpc_client->start_server_and_register_slobrok(rpc_client_config_id);
+ }
+}
+
+void
+BmCluster::stop_rpc_client()
+{
+ if (_rpc_client) {
+ LOG(info, "stop rpc client");
+ _rpc_client->shutdown();
+ _rpc_client.reset();
+ }
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
new file mode 100644
index 00000000000..9a41ec5b415
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
@@ -0,0 +1,55 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+#include "bm_cluster_params.h"
+
+namespace config {
+
+class IConfigContext;
+class ConfigSet;
+
+}
+
+namespace document { class DocumentTypeRepo; }
+namespace mbus { class Slobrok; }
+namespace storage::rpc { class SharedRpcResources; }
+
+namespace search::bmcluster {
+
+class BmMessageBus;
+
+/*
+ * Class representing a benchmark cluster with one or more benchmark nodes.
+ */
+class BmCluster {
+ struct MessageBusConfigSet;
+ struct RpcClientConfigSet;
+ BmClusterParams _params;
+ int _slobrok_port;
+ int _rpc_client_port;
+ std::unique_ptr<MessageBusConfigSet> _message_bus_config;
+ std::unique_ptr<RpcClientConfigSet> _rpc_client_config;
+ std::unique_ptr<config::ConfigSet> _config_set;
+ std::shared_ptr<config::IConfigContext> _config_context;
+ std::unique_ptr<mbus::Slobrok> _slobrok;
+ std::unique_ptr<BmMessageBus> _message_bus;
+ std::unique_ptr<storage::rpc::SharedRpcResources> _rpc_client;
+ std::shared_ptr<const document::DocumentTypeRepo> _repo;
+
+public:
+ BmCluster(const BmClusterParams& params, std::shared_ptr<const document::DocumentTypeRepo> repo);
+ ~BmCluster();
+ void start_slobrok();
+ void stop_slobrok();
+ void wait_slobrok(const vespalib::string &name);
+ void start_message_bus();
+ void stop_message_bus();
+ void start_rpc_client();
+ void stop_rpc_client();
+ storage::rpc::SharedRpcResources &get_rpc_client() { return *_rpc_client; }
+ BmMessageBus& get_message_bus() { return *_message_bus; }
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
index a1b40c56e11..8d8f89a6fe7 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
@@ -13,7 +13,7 @@ using storage::api::StorageMessageAddress;
using storage::rpc::SharedRpcResources;
using storage::lib::NodeType;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
index 699036be5c9..d4e26e2f4fd 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
@@ -5,7 +5,7 @@
namespace storage::api { class StorageMessageAddress; }
namespace storage::rpc { class SharedRpcResources; }
-namespace feedbm {
+namespace search::bmcluster {
/*
* Fake cluster controller that sets cluster state to be up.
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
new file mode 100644
index 00000000000..3452ffa8b25
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
@@ -0,0 +1,47 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_cluster_params.h"
+#include <iostream>
+
+namespace search::bmcluster {
+
+BmClusterParams::BmClusterParams()
+ : _bucket_db_stripe_bits(0),
+ _distributor_stripes(0),
+ _enable_distributor(false),
+ _enable_service_layer(false),
+ _indexing_sequencer(),
+ _response_threads(2), // Same default as in stor-filestor.def
+ _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def
+ _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def
+ _rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def
+ _skip_communicationmanager_thread(false), // Same default as in stor-communicationmanager.def
+ _skip_get_spi_bucket_info(false),
+ _use_async_message_handling_on_schedule(false),
+ _use_document_api(false),
+ _use_message_bus(false),
+ _use_storage_chain(false)
+{
+}
+
+BmClusterParams::~BmClusterParams() = default;
+
+bool
+BmClusterParams::check() const
+{
+ if (_response_threads < 1) {
+ std::cerr << "Too few response threads: " << _response_threads << std::endl;
+ return false;
+ }
+ if (_rpc_network_threads < 1) {
+ std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl;
+ return false;
+ }
+ if (_rpc_targets_per_node < 1) {
+ std::cerr << "Too few rpc targets per node: " << _rpc_targets_per_node << std::endl;
+ return false;
+ }
+ return true;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
new file mode 100644
index 00000000000..dcf579452c6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
@@ -0,0 +1,67 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace search::bmcluster {
+
+/*
+ * Parameters for setting up a benchmark cluster.
+ */
+class BmClusterParams
+{
+ uint32_t _bucket_db_stripe_bits;
+ uint32_t _distributor_stripes;
+ bool _enable_distributor;
+ bool _enable_service_layer;
+ vespalib::string _indexing_sequencer;
+ uint32_t _response_threads;
+ uint32_t _rpc_events_before_wakeup;
+ uint32_t _rpc_network_threads;
+ uint32_t _rpc_targets_per_node;
+ bool _skip_communicationmanager_thread;
+ bool _skip_get_spi_bucket_info;
+ bool _use_async_message_handling_on_schedule;
+ bool _use_document_api;
+ bool _use_message_bus;
+ bool _use_storage_chain;
+public:
+ BmClusterParams();
+ ~BmClusterParams();
+ uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
+ uint32_t get_distributor_stripes() const { return _distributor_stripes; }
+ bool get_enable_distributor() const { return _enable_distributor; }
+ const vespalib::string & get_indexing_sequencer() const { return _indexing_sequencer; }
+ uint32_t get_response_threads() const { return _response_threads; }
+ uint32_t get_rpc_events_before_wakeup() const { return _rpc_events_before_wakeup; }
+ uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
+ uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; }
+ bool get_skip_communicationmanager_thread() const { return _skip_communicationmanager_thread; }
+ bool get_skip_get_spi_bucket_info() const { return _skip_get_spi_bucket_info; }
+ bool get_use_async_message_handling_on_schedule() const { return _use_async_message_handling_on_schedule; }
+ bool get_use_document_api() const { return _use_document_api; }
+ bool get_use_message_bus() const { return _use_message_bus; }
+ bool get_use_storage_chain() const { return _use_storage_chain; }
+ bool needs_distributor() const { return _enable_distributor || _use_document_api; }
+ bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
+ void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
+ void set_distributor_stripes(uint32_t value) { _distributor_stripes = value; }
+ void set_enable_distributor(bool value) { _enable_distributor = value; }
+ void set_enable_service_layer(bool value) { _enable_service_layer = value; }
+ void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
+ void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
+ void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; }
+ void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
+ void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; }
+ void set_skip_communicationmanager_thread(bool value) { _skip_communicationmanager_thread = value; }
+ void set_skip_get_spi_bucket_info(bool value) { _skip_get_spi_bucket_info = value; }
+ void set_use_async_message_handling_on_schedule(bool value) { _use_async_message_handling_on_schedule = value; }
+ void set_use_document_api(bool value) { _use_document_api = value; }
+ void set_use_message_bus(bool value) { _use_message_bus = value; }
+ void set_use_storage_chain(bool value) { _use_storage_chain = value; }
+ bool check() const;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp
new file mode 100644
index 00000000000..91f3f916aad
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.cpp
@@ -0,0 +1,195 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_feed.h"
+#include "bm_range.h"
+#include "bucket_selector.h"
+#include "pending_tracker.h"
+#include "i_bm_feed_handler.h"
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bmcluster.bm_feed");
+
+using document::AssignValueUpdate;
+using document::Document;
+using document::DocumentId;
+using document::DocumentType;
+using document::DocumentTypeRepo;
+using document::DocumentUpdate;
+using document::IntFieldValue;
+using document::FieldUpdate;
+
+namespace search::bmcluster {
+
+BmFeed::BmFeed(std::shared_ptr<const DocumentTypeRepo> repo)
+ : _repo(std::move(repo)),
+ _document_type(_repo->getDocumentType("test")),
+ _field(_document_type->getField("int")),
+ _bucket_bits(16),
+ _bucket_space(document::test::makeBucketSpace("test"))
+{
+}
+
+BmFeed::~BmFeed()
+{
+}
+
+DocumentId
+BmFeed::make_document_id(uint32_t n, uint32_t i) const
+{
+ DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i));
+ return id;
+}
+
+std::unique_ptr<Document>
+BmFeed::make_document(uint32_t n, uint32_t i) const
+{
+ auto id = make_document_id(n, i);
+ auto document = std::make_unique<Document>(*_document_type, id);
+ document->setRepo(*_repo);
+ document->setFieldValue(_field, std::make_unique<IntFieldValue>(i));
+ return document;
+}
+
+std::unique_ptr<DocumentUpdate>
+BmFeed::make_document_update(uint32_t n, uint32_t i) const
+{
+ auto id = make_document_id(n, i);
+ auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id);
+ document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15))));
+ return document_update;
+}
+
+vespalib::nbostream
+BmFeed::make_put_feed(BmRange range, BucketSelector bucket_selector)
+{
+ vespalib::nbostream serialized_feed;
+ LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end());
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ auto n = bucket_selector(i);
+ serialized_feed << make_bucket_id(n);
+ auto document = make_document(n, i);
+ document->serialize(serialized_feed);
+ }
+ return serialized_feed;
+}
+
+vespalib::nbostream
+BmFeed::make_update_feed(BmRange range, BucketSelector bucket_selector)
+{
+ vespalib::nbostream serialized_feed;
+ LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end());
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ auto n = bucket_selector(i);
+ serialized_feed << make_bucket_id(n);
+ auto document_update = make_document_update(n, i);
+ document_update->serializeHEAD(serialized_feed);
+ }
+ return serialized_feed;
+}
+
+vespalib::nbostream
+BmFeed::make_remove_feed(BmRange range, BucketSelector bucket_selector)
+{
+ vespalib::nbostream serialized_feed;
+ LOG(debug, "make_remove_feed([%u..%u))", range.get_start(), range.get_end());
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ auto n = bucket_selector(i);
+ serialized_feed << make_bucket_id(n);
+ auto document_id = make_document_id(n, i);
+ vespalib::string raw_id = document_id.toString();
+ serialized_feed.write(raw_id.c_str(), raw_id.size() + 1);
+ }
+ return serialized_feed;
+}
+
+
+void
+BmFeed::put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+{
+ LOG(debug, "put_async_task([%u..%u))", range.get_start(), range.get_end());
+ PendingTracker pending_tracker(max_pending);
+ feed_handler.attach_bucket_info_queue(pending_tracker);
+ auto &repo = *_repo;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ bool use_timestamp = !feed_handler.manages_timestamp();
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ auto document = std::make_unique<Document>(repo, is);
+ feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeed::update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+{
+ LOG(debug, "update_async_task([%u..%u))", range.get_start(), range.get_end());
+ PendingTracker pending_tracker(max_pending);
+ feed_handler.attach_bucket_info_queue(pending_tracker);
+ auto &repo = *_repo;
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ bool use_timestamp = !feed_handler.manages_timestamp();
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ auto document_update = DocumentUpdate::createHEAD(repo, is);
+ feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeed::get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed)
+{
+ LOG(debug, "get_async_task([%u..%u))", range.get_start(), range.get_end());
+ search::bmcluster::PendingTracker pending_tracker(max_pending);
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ vespalib::string all_fields(document::AllFields::NAME);
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ DocumentId document_id(is);
+ feed_handler.get(bucket, all_fields, document_id, pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+void
+BmFeed::remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias)
+{
+ LOG(debug, "remove_async_task([%u..%u))", range.get_start(), range.get_end());
+ search::bmcluster::PendingTracker pending_tracker(max_pending);
+ feed_handler.attach_bucket_info_queue(pending_tracker);
+ vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
+ document::BucketId bucket_id;
+ bool use_timestamp = !feed_handler.manages_timestamp();
+ for (unsigned int i = range.get_start(); i < range.get_end(); ++i) {
+ is >> bucket_id;
+ document::Bucket bucket(_bucket_space, bucket_id);
+ DocumentId document_id(is);
+ feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + i) : 0), pending_tracker);
+ }
+ assert(is.empty());
+ pending_tracker.drain();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h
new file mode 100644
index 00000000000..cab22651251
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feed.h
@@ -0,0 +1,57 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
+
+namespace document {
+
+class Document;
+class DocumentType;
+class DocumentTypeRepo;
+class DocumentUpdate;
+class Field;
+
+}
+
+namespace vespalib { class nbostream; }
+
+namespace search::bmcluster {
+
+class BmRange;
+class BucketSelector;
+class IBmFeedHandler;
+
+/*
+ * Class to generate synthetic feed of documents.
+ */
+class BmFeed {
+ std::shared_ptr<const document::DocumentTypeRepo> _repo;
+ const document::DocumentType* _document_type;
+ const document::Field& _field;
+ uint32_t _bucket_bits;
+ document::BucketSpace _bucket_space;
+public:
+
+ BmFeed(std::shared_ptr<const document::DocumentTypeRepo> document_types);
+ ~BmFeed();
+ uint32_t num_buckets() const { return (1u << _bucket_bits); }
+ document::BucketSpace get_bucket_space() const noexcept { return _bucket_space; }
+ document::BucketId make_bucket_id(uint32_t n) const { return document::BucketId(_bucket_bits, n & (num_buckets() - 1)); }
+ document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); }
+ document::DocumentId make_document_id(uint32_t n, uint32_t i) const;
+ std::unique_ptr<document::Document> make_document(uint32_t n, uint32_t i) const;
+ std::unique_ptr<document::DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const;
+ vespalib::nbostream make_put_feed(BmRange range, BucketSelector bucket_selector);
+ vespalib::nbostream make_update_feed(BmRange range, BucketSelector bucket_selector);
+ vespalib::nbostream make_remove_feed(BmRange range, BucketSelector bucket_selector);
+ void put_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ void update_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+ void get_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed);
+ void remove_async_task(IBmFeedHandler& feed_handler, uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp
index b608593dada..fc6acbcab12 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp
@@ -24,7 +24,7 @@ using mbus::SourceSession;
using storage::mbusprot::StorageProtocol;
using storage::mbusprot::StorageReply;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.h
index a9cff1fb826..7829a4e4946 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.h
@@ -16,12 +16,12 @@ class SourceSession;
}
-namespace feedbm {
+namespace search::bmcluster {
class PendingTracker;
/*
- * Message bus for feed benchmark program.
+ * Message bus for benchmark cluster.
*/
class BmMessageBus
{
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
new file mode 100644
index 00000000000..808ca348622
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -0,0 +1,675 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_node.h"
+#include "bm_cluster.h"
+#include "bm_cluster_params.h"
+#include "bm_message_bus.h"
+#include "bm_storage_chain_builder.h"
+#include "bm_storage_link_context.h"
+#include "storage_api_chain_bm_feed_handler.h"
+#include "storage_api_message_bus_bm_feed_handler.h"
+#include "storage_api_rpc_bm_feed_handler.h"
+#include "document_api_message_bus_bm_feed_handler.h"
+#include "i_bm_feed_handler.h"
+#include "spi_bm_feed_handler.h"
+#include <vespa/config-attributes.h>
+#include <vespa/config-bucketspaces.h>
+#include <vespa/config-imported-fields.h>
+#include <vespa/config-indexschema.h>
+#include <vespa/config-persistence.h>
+#include <vespa/config-rank-profiles.h>
+#include <vespa/config-slobroks.h>
+#include <vespa/config-stor-distribution.h>
+#include <vespa/config-stor-filestor.h>
+#include <vespa/config-summary.h>
+#include <vespa/config-summarymap.h>
+#include <vespa/config-upgrading.h>
+#include <vespa/config/common/configcontext.h>
+#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/document/fieldset/fieldsetrepo.h>
+#include <vespa/document/repo/configbuilder.h>
+#include <vespa/document/repo/document_type_repo_factory.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/messagebus/config-messagebus.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/metrics/config-metricsmanager.h>
+#include <vespa/searchcommon/common/schemaconfigurer.h>
+#include <vespa/searchcore/proton/common/alloc_config.h>
+#include <vespa/searchcore/proton/matching/querylimiter.h>
+#include <vespa/searchcore/proton/metrics/metricswireservice.h>
+#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
+#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
+#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
+#include <vespa/searchcore/proton/server/bootstrapconfig.h>
+#include <vespa/searchcore/proton/server/documentdb.h>
+#include <vespa/searchcore/proton/server/document_db_maintenance_config.h>
+#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
+#include <vespa/searchcore/proton/server/fileconfigmanager.h>
+#include <vespa/searchcore/proton/server/memoryconfigstore.h>
+#include <vespa/searchcore/proton/server/persistencehandlerproxy.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
+#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/searchlib/transactionlog/translogserver.h>
+#include <vespa/searchsummary/config/config-juniperrc.h>
+#include <vespa/storage/bucketdb/config-stor-bucket-init.h>
+#include <vespa/storage/common/i_storage_chain_builder.h>
+#include <vespa/storage/config/config-stor-bouncer.h>
+#include <vespa/storage/config/config-stor-communicationmanager.h>
+#include <vespa/storage/config/config-stor-distributormanager.h>
+#include <vespa/storage/config/config-stor-opslogger.h>
+#include <vespa/storage/config/config-stor-prioritymapping.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storage/config/config-stor-status.h>
+#include <vespa/storage/config/config-stor-visitordispatcher.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/storageserver/app/distributorprocess.h>
+#include <vespa/storageserver/app/servicelayerprocess.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <tests/proton/common/dummydbowner.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bmcluster.bm_node");
+
+using cloud::config::SlobroksConfigBuilder;
+using cloud::config::filedistribution::FiledistributorrpcConfig;
+using config::ConfigSet;
+using document::BucketSpace;
+using document::DocumenttypesConfig;
+using document::DocumenttypesConfigBuilder;
+using document::DocumentType;
+using document::DocumentTypeRepo;
+using document::Field;
+using messagebus::MessagebusConfigBuilder;
+using metrics::MetricsmanagerConfigBuilder;
+using proton::BootstrapConfig;
+using proton::DocTypeName;
+using proton::DocumentDB;
+using proton::DocumentDBConfig;
+using proton::HwInfo;
+using search::index::Schema;
+using search::index::SchemaBuilder;
+using search::transactionlog::TransLogServer;
+using storage::rpc::SharedRpcResources;
+using storage::rpc::StorageApiRpcService;
+using storage::spi::PersistenceProvider;
+using vespa::config::content::PersistenceConfigBuilder;
+using vespa::config::content::StorDistributionConfigBuilder;
+using vespa::config::content::StorFilestorConfigBuilder;
+using vespa::config::content::UpgradingConfigBuilder;
+using vespa::config::content::core::BucketspacesConfig;
+using vespa::config::content::core::BucketspacesConfigBuilder;
+using vespa::config::content::core::StorBouncerConfigBuilder;
+using vespa::config::content::core::StorBucketInitConfigBuilder;
+using vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
+using vespa::config::content::core::StorDistributormanagerConfigBuilder;
+using vespa::config::content::core::StorOpsloggerConfigBuilder;
+using vespa::config::content::core::StorPrioritymappingConfigBuilder;
+using vespa::config::content::core::StorServerConfigBuilder;
+using vespa::config::content::core::StorStatusConfigBuilder;
+using vespa::config::content::core::StorVisitorConfigBuilder;
+using vespa::config::content::core::StorVisitordispatcherConfigBuilder;
+using vespa::config::search::AttributesConfig;
+using vespa::config::search::AttributesConfigBuilder;
+using vespa::config::search::ImportedFieldsConfig;
+using vespa::config::search::IndexschemaConfig;
+using vespa::config::search::RankProfilesConfig;
+using vespa::config::search::SummaryConfig;
+using vespa::config::search::SummarymapConfig;
+using vespa::config::search::core::ProtonConfig;
+using vespa::config::search::core::ProtonConfigBuilder;
+using vespa::config::search::summary::JuniperrcConfig;
+using vespalib::compression::CompressionConfig;
+
+namespace search::bmcluster {
+
+vespalib::string base_dir = "testdb";
+
+std::shared_ptr<AttributesConfig> make_attributes_config() {
+ AttributesConfigBuilder builder;
+ AttributesConfig::Attribute attribute;
+ attribute.name = "int";
+ attribute.datatype = AttributesConfig::Attribute::Datatype::INT32;
+ builder.attribute.emplace_back(attribute);
+ return std::make_shared<AttributesConfig>(builder);
+}
+
+std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const DocumentTypeRepo> repo, const DocTypeName& doc_type_name)
+{
+ auto indexschema = std::make_shared<IndexschemaConfig>();
+ auto attributes = make_attributes_config();
+ auto summary = std::make_shared<SummaryConfig>();
+ std::shared_ptr<Schema> schema(new Schema());
+ SchemaBuilder::build(*indexschema, *schema);
+ SchemaBuilder::build(*attributes, *schema);
+ SchemaBuilder::build(*summary, *schema);
+ return std::make_shared<DocumentDBConfig>(
+ 1,
+ std::make_shared<RankProfilesConfig>(),
+ std::make_shared<proton::matching::RankingConstants>(),
+ std::make_shared<proton::matching::RankingExpressions>(),
+ std::make_shared<proton::matching::OnnxModels>(),
+ indexschema,
+ attributes,
+ summary,
+ std::make_shared<SummarymapConfig>(),
+ std::make_shared<JuniperrcConfig>(),
+ document_types,
+ repo,
+ std::make_shared<ImportedFieldsConfig>(),
+ std::make_shared<TuneFileDocumentDB>(),
+ schema,
+ std::make_shared<proton::DocumentDBMaintenanceConfig>(),
+ search::LogDocumentStore::Config(),
+ std::make_shared<const proton::ThreadingServiceConfig>(proton::ThreadingServiceConfig::make(1)),
+ std::make_shared<const proton::AllocConfig>(),
+ "client",
+ doc_type_name.getName());
+}
+
+void
+make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
+{
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+}
+
+void
+make_bucketspaces_config(BucketspacesConfigBuilder& bucketspaces)
+{
+ BucketspacesConfigBuilder::Documenttype bucket_space_map;
+ bucket_space_map.name = "test";
+ bucket_space_map.bucketspace = "default";
+ bucketspaces.documenttype.emplace_back(std::move(bucket_space_map));
+}
+
+class MyPersistenceEngineOwner : public proton::IPersistenceEngineOwner
+{
+ void setClusterState(BucketSpace, const storage::spi::ClusterState&) override { }
+};
+
+struct MyResourceWriteFilter : public proton::IResourceWriteFilter
+{
+ bool acceptWriteOperation() const override { return true; }
+ State getAcceptState() const override { return IResourceWriteFilter::State(); }
+};
+
+class MyServiceLayerProcess : public storage::ServiceLayerProcess {
+ PersistenceProvider& _provider;
+
+public:
+ MyServiceLayerProcess(const config::ConfigUri& configUri,
+ PersistenceProvider& provider,
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder);
+ ~MyServiceLayerProcess() override { shutdown(); }
+
+ void shutdown() override;
+ void setupProvider() override;
+ PersistenceProvider& getProvider() override;
+};
+
+MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri& configUri,
+ PersistenceProvider& provider,
+ std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
+ : ServiceLayerProcess(configUri),
+ _provider(provider)
+{
+ if (chain_builder) {
+ set_storage_chain_builder(std::move(chain_builder));
+ }
+}
+
+void
+MyServiceLayerProcess::shutdown()
+{
+ ServiceLayerProcess::shutdown();
+}
+
+void
+MyServiceLayerProcess::setupProvider()
+{
+}
+
+PersistenceProvider&
+MyServiceLayerProcess::getProvider()
+{
+ return _provider;
+}
+
+struct StorageConfigSet
+{
+ vespalib::string config_id;
+ DocumenttypesConfigBuilder documenttypes;
+ StorDistributionConfigBuilder stor_distribution;
+ StorBouncerConfigBuilder stor_bouncer;
+ StorCommunicationmanagerConfigBuilder stor_communicationmanager;
+ StorOpsloggerConfigBuilder stor_opslogger;
+ StorPrioritymappingConfigBuilder stor_prioritymapping;
+ UpgradingConfigBuilder upgrading;
+ StorServerConfigBuilder stor_server;
+ StorStatusConfigBuilder stor_status;
+ BucketspacesConfigBuilder bucketspaces;
+ MetricsmanagerConfigBuilder metricsmanager;
+ SlobroksConfigBuilder slobroks;
+ MessagebusConfigBuilder messagebus;
+
+ StorageConfigSet(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
+ : config_id(config_id_in),
+ documenttypes(documenttypes_in),
+ stor_distribution(),
+ stor_bouncer(),
+ stor_communicationmanager(),
+ stor_opslogger(),
+ stor_prioritymapping(),
+ upgrading(),
+ stor_server(),
+ stor_status(),
+ bucketspaces(),
+ metricsmanager(),
+ slobroks(),
+ messagebus()
+ {
+ {
+ auto& dc = stor_distribution;
+ {
+ StorDistributionConfigBuilder::Group group;
+ {
+ StorDistributionConfigBuilder::Group::Nodes node;
+ node.index = 0;
+ group.nodes.push_back(std::move(node));
+ }
+ group.index = "invalid";
+ group.name = "invalid";
+ group.capacity = 1.0;
+ group.partitions = "";
+ dc.group.push_back(std::move(group));
+ }
+ dc.redundancy = 1;
+ dc.readyCopies = 1;
+ }
+ stor_server.isDistributor = distributor;
+ stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits();
+ if (distributor) {
+ stor_server.rootFolder = "distributor";
+ } else {
+ stor_server.rootFolder = "storage";
+ }
+ make_slobroks_config(slobroks, slobrok_port);
+ stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
+ stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakeup();
+ stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node();
+ stor_communicationmanager.mbusport = mbus_port;
+ stor_communicationmanager.rpcport = rpc_port;
+ stor_communicationmanager.skipThread = params.get_skip_communicationmanager_thread();
+
+ stor_status.httpport = status_port;
+ make_bucketspaces_config(bucketspaces);
+ }
+
+ ~StorageConfigSet();
+
+ void add_builders(ConfigSet& set) {
+ set.addBuilder(config_id, &documenttypes);
+ set.addBuilder(config_id, &stor_distribution);
+ set.addBuilder(config_id, &stor_bouncer);
+ set.addBuilder(config_id, &stor_communicationmanager);
+ set.addBuilder(config_id, &stor_opslogger);
+ set.addBuilder(config_id, &stor_prioritymapping);
+ set.addBuilder(config_id, &upgrading);
+ set.addBuilder(config_id, &stor_server);
+ set.addBuilder(config_id, &stor_status);
+ set.addBuilder(config_id, &bucketspaces);
+ set.addBuilder(config_id, &metricsmanager);
+ set.addBuilder(config_id, &slobroks);
+ set.addBuilder(config_id, &messagebus);
+ }
+};
+
+StorageConfigSet::~StorageConfigSet() = default;
+
+struct ServiceLayerConfigSet : public StorageConfigSet
+{
+ PersistenceConfigBuilder persistence;
+ StorFilestorConfigBuilder stor_filestor;
+ StorBucketInitConfigBuilder stor_bucket_init;
+ StorVisitorConfigBuilder stor_visitor;
+
+ ServiceLayerConfigSet(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
+ : StorageConfigSet(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
+ persistence(),
+ stor_filestor(),
+ stor_bucket_init(),
+ stor_visitor()
+ {
+ stor_filestor.numResponseThreads = params.get_response_threads();
+ stor_filestor.numNetworkThreads = params.get_rpc_network_threads();
+ stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule();
+ }
+
+ ~ServiceLayerConfigSet();
+
+ void add_builders(ConfigSet& set) {
+ StorageConfigSet::add_builders(set);
+ set.addBuilder(config_id, &persistence);
+ set.addBuilder(config_id, &stor_filestor);
+ set.addBuilder(config_id, &stor_bucket_init);
+ set.addBuilder(config_id, &stor_visitor);
+ }
+};
+
+ServiceLayerConfigSet::~ServiceLayerConfigSet() = default;
+
+struct DistributorConfigSet : public StorageConfigSet
+{
+ StorDistributormanagerConfigBuilder stor_distributormanager;
+ StorVisitordispatcherConfigBuilder stor_visitordispatcher;
+
+ DistributorConfigSet(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
+ : StorageConfigSet(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
+ stor_distributormanager(),
+ stor_visitordispatcher()
+ {
+ stor_distributormanager.numDistributorStripes = params.get_distributor_stripes();
+ }
+
+ ~DistributorConfigSet();
+
+ void add_builders(ConfigSet& set) {
+ StorageConfigSet::add_builders(set);
+ set.addBuilder(config_id, &stor_distributormanager);
+ set.addBuilder(config_id, &stor_visitordispatcher);
+ }
+};
+
+DistributorConfigSet::~DistributorConfigSet() = default;
+
+BmNode::BmNode(std::shared_ptr<document::DocumenttypesConfig> document_types)
+ : _document_types(std::move(document_types)),
+ _repo(document::DocumentTypeRepoFactory::make(*_document_types)),
+ _doc_type_name("test"),
+ _document_type(_repo->getDocumentType(_doc_type_name.getName())),
+ _field(_document_type->getField("int"))
+{
+}
+
+BmNode::~BmNode() = default;
+
+class MyBmNode : public BmNode
+{
+ std::shared_ptr<DocumentDBConfig> _document_db_config;
+ vespalib::string _base_dir;
+ search::index::DummyFileHeaderContext _file_header_context;
+ int _tls_listen_port;
+ int _slobrok_port;
+ int _service_layer_mbus_port;
+ int _service_layer_rpc_port;
+ int _service_layer_status_port;
+ int _distributor_mbus_port;
+ int _distributor_rpc_port;
+ int _distributor_status_port;
+ TransLogServer _tls;
+ vespalib::string _tls_spec;
+ proton::matching::QueryLimiter _query_limiter;
+ vespalib::Clock _clock;
+ proton::DummyWireService _metrics_wire_service;
+ proton::MemoryConfigStores _config_stores;
+ vespalib::ThreadStackExecutor _summary_executor;
+ proton::DummyDBOwner _document_db_owner;
+ BucketSpace _bucket_space;
+ std::shared_ptr<DocumentDB> _document_db;
+ MyPersistenceEngineOwner _persistence_owner;
+ MyResourceWriteFilter _write_filter;
+ proton::test::DiskMemUsageNotifier _disk_mem_usage_notifier;
+ std::shared_ptr<proton::PersistenceEngine> _persistence_engine;
+ std::unique_ptr<const document::FieldSetRepo> _field_set_repo;
+ ServiceLayerConfigSet _service_layer_config;
+ DistributorConfigSet _distributor_config;
+ ConfigSet _config_set;
+ std::shared_ptr<config::IConfigContext> _config_context;
+ std::unique_ptr<IBmFeedHandler> _feed_handler;
+ std::unique_ptr<mbus::Slobrok> _slobrok;
+ std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context;
+ std::unique_ptr<MyServiceLayerProcess> _service_layer;
+ std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
+ std::unique_ptr<storage::DistributorProcess> _distributor;
+
+ void create_document_db(const BmClusterParams& params);
+public:
+ MyBmNode(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types);
+ ~MyBmNode() override;
+ std::unique_ptr<SpiBmFeedHandler> make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) override;
+ void start_service_layer(const BmClusterParams& params) override;
+ void wait_service_layer() override;
+ void start_distributor(const BmClusterParams& params) override;
+ void create_feed_handler(const BmClusterParams& params, BmCluster& cluster) override;
+ void shutdown_feed_handler() override;
+ void shutdown_distributor() override;
+ void shutdown_service_layer() override;
+ IBmFeedHandler* get_feed_handler() override;
+ PersistenceProvider* get_persistence_provider() override;
+};
+
+MyBmNode::MyBmNode(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types)
+ : BmNode(std::move(document_types)),
+ _document_db_config(make_document_db_config(_document_types, _repo, _doc_type_name)),
+ _base_dir(base_dir),
+ _file_header_context(),
+ _tls_listen_port(9017),
+ _slobrok_port(9018),
+ _service_layer_mbus_port(9020),
+ _service_layer_rpc_port(9021),
+ _service_layer_status_port(9022),
+ _distributor_mbus_port(9023),
+ _distributor_rpc_port(9024),
+ _distributor_status_port(9025),
+ _tls("tls", _tls_listen_port, _base_dir, _file_header_context),
+ _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)),
+ _query_limiter(),
+ _clock(),
+ _metrics_wire_service(),
+ _config_stores(),
+ _summary_executor(8, 128_Ki),
+ _document_db_owner(),
+ _bucket_space(document::test::makeBucketSpace(_doc_type_name.getName())),
+ _document_db(),
+ _persistence_owner(),
+ _write_filter(),
+ _disk_mem_usage_notifier(),
+ _persistence_engine(),
+ _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)),
+ _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
+ _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
+ _config_set(),
+ _config_context(std::make_shared<config::ConfigContext>(_config_set)),
+ _feed_handler(),
+ _slobrok(),
+ _service_layer_chain_context(),
+ _service_layer(),
+ _distributor_chain_context(),
+ _distributor()
+{
+ create_document_db(params);
+ _persistence_engine = std::make_unique<proton::PersistenceEngine>(_persistence_owner, _write_filter, _disk_mem_usage_notifier, -1, false);
+ auto proxy = std::make_shared<proton::PersistenceHandlerProxy>(_document_db);
+ _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
+ _service_layer_config.add_builders(_config_set);
+ _distributor_config.add_builders(_config_set);
+ _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info());
+}
+
+MyBmNode::~MyBmNode()
+{
+ if (_persistence_engine) {
+ _persistence_engine->destroyIterators();
+ _persistence_engine->removeHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name);
+ }
+ if (_document_db) {
+ _document_db->close();
+ }
+}
+
+
+void
+MyBmNode::create_document_db(const BmClusterParams& params)
+{
+ vespalib::mkdir(_base_dir, false);
+ vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false);
+ vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig";
+ {
+ proton::FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName());
+ fileCfg.saveConfig(*_document_db_config, 1);
+ }
+ config::DirSpec spec(input_cfg + "/config-1");
+ auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>();
+ proton::DocumentDBConfigHelper mgr(spec, _doc_type_name.getName());
+ auto protonCfg = std::make_shared<ProtonConfigBuilder>();
+ if ( ! params.get_indexing_sequencer().empty()) {
+ vespalib::string sequencer = params.get_indexing_sequencer();
+ std::transform(sequencer.begin(), sequencer.end(), sequencer.begin(), [](unsigned char c){ return std::toupper(c); });
+ protonCfg->indexing.optimize = ProtonConfig::Indexing::getOptimize(sequencer);
+ }
+ auto bootstrap_config = std::make_shared<BootstrapConfig>(1,
+ _document_types,
+ _repo,
+ std::move(protonCfg),
+ std::make_shared<FiledistributorrpcConfig>(),
+ std::make_shared<BucketspacesConfig>(),
+ tuneFileDocDB, HwInfo());
+ mgr.forwardConfig(bootstrap_config);
+ mgr.nextGeneration(0ms);
+ _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name,
+ _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner,
+ _summary_executor, _summary_executor, *_persistence_engine, _tls,
+ _metrics_wire_service, _file_header_context,
+ _config_stores.getConfigStore(_doc_type_name.toString()),
+ std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo());
+ _document_db->start();
+ _document_db->waitForOnlineState();
+}
+
+std::unique_ptr<SpiBmFeedHandler>
+MyBmNode::make_create_bucket_feed_handler(bool skip_get_spi_bucket_info)
+{
+ return std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, skip_get_spi_bucket_info);
+}
+
+void
+MyBmNode::start_service_layer(const BmClusterParams& params)
+{
+ config::ConfigUri config_uri("bm-servicelayer", _config_context);
+ std::unique_ptr<BmStorageChainBuilder> chain_builder;
+ if (params.get_use_storage_chain() && !params.needs_distributor()) {
+ chain_builder = std::make_unique<BmStorageChainBuilder>();
+ _service_layer_chain_context = chain_builder->get_context();
+ }
+ _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri,
+ *_persistence_engine,
+ std::move(chain_builder));
+ _service_layer->setupConfig(100ms);
+ _service_layer->createNode();
+}
+
+void
+MyBmNode::wait_service_layer()
+{
+ _service_layer->getNode().waitUntilInitialized();
+}
+
+void
+MyBmNode::start_distributor(const BmClusterParams& params)
+{
+ config::ConfigUri config_uri("bm-distributor", _config_context);
+ std::unique_ptr<BmStorageChainBuilder> chain_builder;
+ if (params.get_use_storage_chain() && !params.get_use_document_api()) {
+ chain_builder = std::make_unique<BmStorageChainBuilder>();
+ _distributor_chain_context = chain_builder->get_context();
+ }
+ _distributor = std::make_unique<storage::DistributorProcess>(config_uri);
+ if (chain_builder) {
+ _distributor->set_storage_chain_builder(std::move(chain_builder));
+ }
+ _distributor->setupConfig(100ms);
+ _distributor->createNode();
+}
+
+void
+MyBmNode::create_feed_handler(const BmClusterParams& params, BmCluster& cluster)
+{
+ StorageApiRpcService::Params rpc_params;
+ // This is the same compression config as the default in stor-communicationmanager.def.
+ rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
+ rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node();
+ if (params.get_use_document_api()) {
+ _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(cluster.get_message_bus());
+ } else if (params.get_enable_distributor()) {
+ if (params.get_use_storage_chain()) {
+ assert(_distributor_chain_context);
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
+ } else if (params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(cluster.get_message_bus(), true);
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(cluster.get_rpc_client(), _repo, rpc_params, true);
+ }
+ } else if (params.needs_service_layer()) {
+ if (params.get_use_storage_chain()) {
+ assert(_service_layer_chain_context);
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
+ } else if (params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(cluster.get_message_bus(), false);
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(cluster.get_rpc_client(), _repo, rpc_params, false);
+ }
+ }
+}
+
+void
+MyBmNode::shutdown_feed_handler()
+{
+ _feed_handler.reset();
+}
+
+void
+MyBmNode::shutdown_distributor()
+{
+ if (_distributor) {
+ LOG(info, "stop distributor");
+ _distributor->getNode().requestShutdown("controlled shutdown");
+ _distributor->shutdown();
+ }
+}
+
+void
+MyBmNode::shutdown_service_layer()
+{
+ if (_service_layer) {
+ LOG(info, "stop service layer");
+ _service_layer->getNode().requestShutdown("controlled shutdown");
+ _service_layer->shutdown();
+ }
+}
+
+IBmFeedHandler*
+MyBmNode::get_feed_handler()
+{
+ return _feed_handler.get();
+}
+
+PersistenceProvider*
+MyBmNode::get_persistence_provider()
+{
+ return _persistence_engine.get();
+}
+
+std::unique_ptr<BmNode>
+BmNode::create(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types)
+{
+ return std::make_unique<MyBmNode>(params, std::move(document_types));
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
new file mode 100644
index 00000000000..1212aeb4b5a
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
@@ -0,0 +1,56 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+#include <vespa/document/config/config-documenttypes.h>
+#include <vespa/searchcore/proton/common/doctypename.h>
+
+namespace document {
+
+class DocumentTypeRepo;
+class DocumentType;
+class Field;
+
+};
+
+namespace storage::spi { struct PersistenceProvider; }
+
+namespace search::bmcluster {
+
+class BmCluster;
+class BmClusterParams;
+class IBmFeedHandler;
+class SpiBmFeedHandler;
+
+/*
+ * Class representing a single benchmark node in a benchmark cluster.
+ */
+class BmNode {
+protected:
+ std::shared_ptr<document::DocumenttypesConfig> _document_types;
+ std::shared_ptr<const document::DocumentTypeRepo> _repo;
+ proton::DocTypeName _doc_type_name;
+ const document::DocumentType* _document_type;
+ const document::Field& _field;
+
+ BmNode(std::shared_ptr<document::DocumenttypesConfig> document_types);
+public:
+ virtual ~BmNode();
+ virtual std::unique_ptr<SpiBmFeedHandler> make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) = 0;
+ virtual void start_service_layer(const BmClusterParams& params) = 0;
+ virtual void wait_service_layer() = 0;
+ virtual void start_distributor(const BmClusterParams& params) = 0;
+ virtual void create_feed_handler(const BmClusterParams& params, BmCluster& cluster) = 0;
+ virtual void shutdown_feed_handler() = 0;
+ virtual void shutdown_distributor() = 0;
+ virtual void shutdown_service_layer() = 0;
+ virtual IBmFeedHandler* get_feed_handler() = 0;
+ virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0;
+ static std::unique_ptr<BmNode> create(const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types);
+ const proton::DocTypeName& get_doc_type_name() const noexcept { return _doc_type_name; }
+ const document::DocumentType *get_document_type() const noexcept { return _document_type; }
+ const document::Field& get_field() const noexcept { return _field; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_range.h b/searchcore/src/vespa/searchcore/bmcluster/bm_range.h
new file mode 100644
index 00000000000..c4834f8c17b
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_range.h
@@ -0,0 +1,24 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace search::bmcluster {
+
+/*
+ * Range of document "keys" used to generate documents
+ */
+class BmRange
+{
+ uint32_t _start;
+ uint32_t _end;
+public:
+ BmRange(uint32_t start_in, uint32_t end_in)
+ : _start(start_in),
+ _end(end_in)
+ {
+ }
+ uint32_t get_start() const { return _start; }
+ uint32_t get_end() const { return _end; }
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.cpp
index bbe0de70ce2..16883e1cc48 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.cpp
@@ -7,7 +7,7 @@
#include <vespa/log/log.h>
LOG_SETUP(".bm_storage_chain_builder");
-namespace feedbm {
+namespace search::bmcluster {
BmStorageChainBuilder::BmStorageChainBuilder()
: storage::StorageChainBuilder(),
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.h
index bba933da9e0..c61cb200c36 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_chain_builder.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_chain_builder.h
@@ -4,7 +4,7 @@
#include <vespa/storage/common/storage_chain_builder.h>
-namespace feedbm {
+namespace search::bmcluster {
struct BmStorageLinkContext;
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.cpp
index 2aeda91c30c..c251c25b15d 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.cpp
@@ -3,7 +3,7 @@
#include "bm_storage_link.h"
#include "pending_tracker.h"
-namespace feedbm {
+namespace search::bmcluster {
BmStorageLink::BmStorageLink()
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.h
index 95528d7b2d9..8c98479a38b 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link.h
@@ -6,7 +6,7 @@
#include "pending_tracker_hash.h"
#include <vespa/storage/common/storagelink.h>
-namespace feedbm {
+namespace search::bmcluster {
class PendingTracker;
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link_context.h
index f2df20f1f66..f7cc1841770 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link_context.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_link_context.h
@@ -1,6 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-namespace feedbm {
+namespace search::bmcluster {
class BmStorageLink;
diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
index fc43402d68e..6670707ed39 100644
--- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
@@ -3,7 +3,7 @@
#include "bucket_info_queue.h"
#include <vespa/persistence/spi/persistenceprovider.h>
-namespace feedbm {
+namespace search::bmcluster {
BucketInfoQueue::BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors)
: _mutex(),
diff --git a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h
index 07a55127234..1a48f9fa478 100644
--- a/searchcore/src/apps/vespa-feed-bm/bucket_info_queue.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h
@@ -9,7 +9,7 @@
namespace storage::spi { struct PersistenceProvider; }
-namespace feedbm {
+namespace search::bmcluster {
/*
* Class containing a queue of buckets where mutating feed operations
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h b/searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h
new file mode 100644
index 00000000000..7401a12cba0
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_selector.h
@@ -0,0 +1,28 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace search::bmcluster {
+
+/*
+ * Map from document index to bucket to ensure even spread between buckets
+ * while ensuring that each bucket used belong to a specific thread.
+ */
+class BucketSelector
+{
+ uint32_t _thread_id;
+ uint32_t _threads;
+ uint32_t _num_buckets;
+public:
+ BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in)
+ : _thread_id(thread_id_in),
+ _threads(threads_in),
+ _num_buckets((num_buckets_in / _threads) * _threads)
+ {
+ }
+ uint64_t operator()(uint32_t i) const {
+ return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets;
+ }
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp
index 38c8490de69..c6f2626f27c 100644
--- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp
@@ -17,7 +17,7 @@ using document::DocumentUpdate;
using storage::api::StorageMessageAddress;
using storage::lib::NodeType;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
vespalib::string _Storage("storage");
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h
index c71bb113c5b..5358e0a948b 100644
--- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h
@@ -9,7 +9,7 @@ namespace document { class DocumentTypeRepo; }
namespace documentapi { class DocumentMessage; };
namespace storage::api { class StorageMessageAddress; }
-namespace feedbm {
+namespace search::bmcluster {
class BmMessageBus;
diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/i_bm_feed_handler.h
index 26cbf27b455..fc3953c49a5 100644
--- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/i_bm_feed_handler.h
@@ -12,7 +12,7 @@ class DocumentUpdate;
class DocumentId;
}
-namespace feedbm {
+namespace search::bmcluster {
class BucketInfoQueue;
class PendingTracker;
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp
index 94bed4cb3bd..247bf8bece3 100644
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp
@@ -7,7 +7,7 @@
using namespace std::chrono_literals;
-namespace feedbm {
+namespace search::bmcluster {
PendingTracker::PendingTracker(uint32_t limit)
: _pending(0u),
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h
index 4ca84ab7442..a8fa2f77396 100644
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h
@@ -7,7 +7,7 @@
namespace storage::spi { struct PersistenceProvider; }
-namespace feedbm {
+namespace search::bmcluster {
class BucketInfoQueue;
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.cpp
index 6863d35703e..515f7f6b2de 100644
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.cpp
@@ -5,7 +5,7 @@
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>
-namespace feedbm {
+namespace search::bmcluster {
PendingTrackerHash::PendingTrackerHash()
: _mutex(),
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.h
index 89be93fd4ed..de9b6f63aa4 100644
--- a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker_hash.h
@@ -5,7 +5,7 @@
#include <vespa/vespalib/stllike/hash_map.h>
#include <mutex>
-namespace feedbm {
+namespace search::bmcluster {
class PendingTracker;
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
index 11149eecb3f..2aa7ee5ec47 100644
--- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
@@ -15,7 +15,7 @@ using storage::spi::Bucket;
using storage::spi::PersistenceProvider;
using storage::spi::Timestamp;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
index a78aa06628b..3a24a5b38b7 100644
--- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
@@ -8,7 +8,7 @@
namespace document { class FieldSetRepo; }
namespace storage::spi { struct PersistenceProvider; }
-namespace feedbm {
+namespace search::bmcluster {
/*
* Benchmark feed handler for feed directly to persistence provider
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
index 82cf2df065f..34669b8cbdc 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
@@ -18,7 +18,7 @@ using document::Document;
using document::DocumentId;
using document::DocumentUpdate;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
index 0c4b715122e..1c196d746eb 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
@@ -6,7 +6,7 @@
namespace storage::api { class StorageCommand; }
-namespace feedbm {
+namespace search::bmcluster {
struct BmStorageLinkContext;
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
index f63a8e33cc0..04561b5d93e 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
@@ -15,7 +15,7 @@ using document::DocumentUpdate;
using storage::api::StorageMessageAddress;
using storage::lib::NodeType;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
vespalib::string _Storage("storage");
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
index 2aafd0c6830..0027f260b8f 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
@@ -12,7 +12,7 @@ class StorageCommand;
class StorageMessageAddress;
}
-namespace feedbm {
+namespace search::bmcluster {
class BmMessageBus;
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
index 04d49bba0a3..3e0426cb308 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
@@ -22,7 +22,7 @@ using storage::rpc::SharedRpcResources;
using storage::rpc::StorageApiRpcService;
using storage::lib::NodeType;
-namespace feedbm {
+namespace search::bmcluster {
namespace {
vespalib::string _Storage("storage");
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
index 5057d8889a5..360f702e590 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
@@ -16,7 +16,7 @@ class MessageCodecProvider;
class SharedRpcResources;
}
-namespace feedbm {
+namespace search::bmcluster {
/*
* Benchmark feed handler for feed to service layer or distributor
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.cpp
index 260b0c8a7af..ec1ebec2954 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.cpp
@@ -6,7 +6,7 @@
#include <vespa/log/log.h>
LOG_SETUP(".storage_reply_error_checker");
-namespace feedbm {
+namespace search::bmcluster {
StorageReplyErrorChecker::StorageReplyErrorChecker()
: _errors(0u)
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h
index 4743367b426..2fcb6aad14a 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h
@@ -6,7 +6,7 @@
namespace storage::api { class StorageMessage; }
-namespace feedbm {
+namespace search::bmcluster {
class StorageReplyErrorChecker {
protected: