From fc89590cc955e6ef2cde7cb44c833a561b14806b Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Tue, 14 Sep 2021 13:05:56 +0200 Subject: Move ownership of feed handlers from BmNode to BmCluster. --- .../src/vespa/searchcore/bmcluster/CMakeLists.txt | 3 + .../src/vespa/searchcore/bmcluster/bm_cluster.cpp | 110 ++++++++++++++++----- .../src/vespa/searchcore/bmcluster/bm_cluster.h | 17 +++- .../searchcore/bmcluster/bm_cluster_controller.cpp | 14 ++- .../searchcore/bmcluster/bm_cluster_controller.h | 6 +- .../vespa/searchcore/bmcluster/bm_distribution.cpp | 98 ++++++++++++++++++ .../vespa/searchcore/bmcluster/bm_distribution.h | 34 +++++++ .../searchcore/bmcluster/bm_message_bus_routes.cpp | 19 ++++ .../searchcore/bmcluster/bm_message_bus_routes.h | 24 +++++ .../src/vespa/searchcore/bmcluster/bm_node.cpp | 88 +++-------------- .../src/vespa/searchcore/bmcluster/bm_node.h | 6 +- .../bmcluster/bm_storage_message_addresses.cpp | 27 +++++ .../bmcluster/bm_storage_message_addresses.h | 28 ++++++ .../searchcore/bmcluster/bucket_info_queue.cpp | 13 ++- .../vespa/searchcore/bmcluster/bucket_info_queue.h | 10 +- .../document_api_message_bus_bm_feed_handler.cpp | 40 ++++---- .../document_api_message_bus_bm_feed_handler.h | 19 ++-- .../vespa/searchcore/bmcluster/i_bm_distribution.h | 34 +++++++ .../vespa/searchcore/bmcluster/pending_tracker.cpp | 4 +- .../vespa/searchcore/bmcluster/pending_tracker.h | 4 +- .../searchcore/bmcluster/spi_bm_feed_handler.cpp | 70 +++++++++---- .../searchcore/bmcluster/spi_bm_feed_handler.h | 18 ++-- .../storage_api_chain_bm_feed_handler.cpp | 49 ++++----- .../bmcluster/storage_api_chain_bm_feed_handler.h | 14 ++- .../storage_api_message_bus_bm_feed_handler.cpp | 28 ++++-- .../storage_api_message_bus_bm_feed_handler.h | 24 +++-- .../bmcluster/storage_api_rpc_bm_feed_handler.cpp | 32 +++--- .../bmcluster/storage_api_rpc_bm_feed_handler.h | 12 ++- .../bmcluster/storage_reply_error_checker.h | 3 + 29 files changed, 599 insertions(+), 249 deletions(-) create mode 100644 searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp create mode 100644 searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h create mode 100644 searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp create mode 100644 searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h create mode 100644 searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp create mode 100644 searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h create mode 100644 searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index 8d6dbd0f938..44e13c5954e 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -4,11 +4,14 @@ vespa_add_library(searchcore_bmcluster STATIC bm_cluster.cpp bm_cluster_controller.cpp bm_cluster_params.cpp + bm_distribution.cpp bm_feed.cpp bm_message_bus.cpp + bm_message_bus_routes.cpp bm_node.cpp bm_storage_chain_builder.cpp bm_storage_link.cpp + bm_storage_message_addresses.cpp bucket_info_queue.cpp document_api_message_bus_bm_feed_handler.cpp pending_tracker.cpp diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp index 23f3f6d1e30..b7886a08325 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp @@ -2,11 +2,17 @@ #include "bm_cluster.h" #include "bm_cluster_controller.h" +#include "bm_distribution.h" #include "bm_feed.h" #include "bm_message_bus.h" #include "bm_node.h" +#include "document_api_message_bus_bm_feed_handler.h" #include "spi_bm_feed_handler.h" +#include "storage_api_chain_bm_feed_handler.h" +#include "storage_api_message_bus_bm_feed_handler.h" +#include "storage_api_rpc_bm_feed_handler.h" #include +#include #include #include #include @@ -23,6 +29,9 @@ using cloud::config::SlobroksConfigBuilder; using config::ConfigSet; using messagebus::MessagebusConfigBuilder; using storage::rpc::SharedRpcResources; +using storage::rpc::StorageApiRpcService; +using storage::spi::PersistenceProvider; +using vespalib::compression::CompressionConfig; namespace search::bmcluster { @@ -51,6 +60,34 @@ make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) slobroks.slobrok.push_back(std::move(slobrok)); } +std::vector> +collect_storage_link_contexts(const std::vector> &nodes, bool distributor) +{ + std::vector> contexts; + for (auto& node : nodes) { + if (node) { + contexts.emplace_back(node->get_storage_link_context(distributor)); + } else { + contexts.emplace_back(); + } + } + return contexts; +} + +std::vector +collect_persistence_providers(const std::vector> &nodes) +{ + std::vector providers; + for (auto& node : nodes) { + if (node) { + providers.emplace_back(node->get_persistence_provider()); + } else { + providers.emplace_back(nullptr); + } + } + return providers; +} + } struct BmCluster::MessageBusConfigSet { @@ -109,8 +146,10 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl _base_port(base_port), _document_types(std::move(document_types)), _repo(std::move(repo)), - _nodes(params.get_num_nodes()) - + _field_set_repo(std::make_unique(*_repo)), + _distribution(std::make_shared(params.get_num_nodes())), + _nodes(params.get_num_nodes()), + _feed_handler() { _message_bus_config->add_builders(*_config_set); _rpc_client_config->add_builders(*_config_set); @@ -221,14 +260,6 @@ BmCluster::make_nodes() } } -BmNode& -BmCluster::get_node(uint32_t node_idx) -{ - assert(node_idx < _nodes.size()); - assert(_nodes[node_idx]); - return *_nodes[node_idx]; -} - void BmCluster::initialize_providers() { @@ -244,9 +275,15 @@ void BmCluster::create_buckets(BmFeed& feed) { LOG(info, "create %u buckets", feed.num_buckets()); - auto& node = get_node(0); for (unsigned int i = 0; i < feed.num_buckets(); ++i) { - node.create_bucket(feed.make_bucket(i)); + auto bucket = feed.make_bucket(i); + uint32_t node_idx = _distribution->get_service_layer_node_idx(bucket); + if (node_idx < _nodes.size()) { + auto& node = _nodes[node_idx]; + if (node) { + node->create_bucket(feed.make_bucket(i)); + } + } } } @@ -270,7 +307,7 @@ BmCluster::start_service_layers() node->wait_service_layer_slobrok(); } } - BmClusterController fake_controller(get_rpc_client(), _params.get_num_nodes()); + BmClusterController fake_controller(get_rpc_client(), *_distribution); uint32_t node_idx = 0; for (const auto &node : _nodes) { if (node) { @@ -293,7 +330,7 @@ BmCluster::start_distributors() node->wait_distributor_slobrok(); } } - BmClusterController fake_controller(get_rpc_client(), _params.get_num_nodes()); + BmClusterController fake_controller(get_rpc_client(), *_distribution); uint32_t node_idx = 0; for (const auto &node : _nodes) { if (node) { @@ -306,23 +343,42 @@ BmCluster::start_distributors() } void -BmCluster::create_feed_handlers() +BmCluster::create_feed_handler() { - for (const auto &node : _nodes) { - if (node) { - node->create_feed_handler(_params); + StorageApiRpcService::Params rpc_params; + // This is the same compression config as the default in stor-communicationmanager.def. + rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); + rpc_params.num_rpc_targets_per_node = _params.get_rpc_targets_per_node(); + if (_params.get_use_document_api()) { + _feed_handler = std::make_unique(get_message_bus(), *_distribution); + } else if (_params.get_enable_distributor()) { + if (_params.get_use_storage_chain()) { + auto contexts = collect_storage_link_contexts(_nodes, true); + _feed_handler = std::make_unique(std::move(contexts), *_distribution, true); + } else if (_params.get_use_message_bus()) { + _feed_handler = std::make_unique(get_message_bus(), *_distribution, true); + } else { + _feed_handler = std::make_unique(get_rpc_client(), _repo, rpc_params, *_distribution, true); + } + } else if (_params.needs_service_layer()) { + if (_params.get_use_storage_chain()) { + auto contexts = collect_storage_link_contexts(_nodes, false); + _feed_handler = std::make_unique(std::move(contexts), *_distribution, false); + } else if (_params.get_use_message_bus()) { + _feed_handler = std::make_unique(get_message_bus(), *_distribution, false); + } else { + _feed_handler = std::make_unique(get_rpc_client(), _repo, rpc_params, *_distribution, false); } + } else { + auto providers = collect_persistence_providers(_nodes); + _feed_handler = std::make_unique(std::move(providers), *_field_set_repo, *_distribution, _params.get_skip_get_spi_bucket_info()); } } void -BmCluster::shutdown_feed_handlers() +BmCluster::shutdown_feed_handler() { - for (const auto &node : _nodes) { - if (node) { - node->shutdown_feed_handler(); - } - } + _feed_handler.reset(); } void @@ -363,13 +419,13 @@ BmCluster::start(BmFeed& feed) if (_params.needs_message_bus()) { start_message_bus(); } - create_feed_handlers(); + create_feed_handler(); } void BmCluster::stop() { - shutdown_feed_handlers(); + shutdown_feed_handler(); stop_message_bus(); shutdown_distributors(); shutdown_service_layers(); @@ -378,7 +434,7 @@ BmCluster::stop() IBmFeedHandler* BmCluster::get_feed_handler() { - return get_node(0).get_feed_handler(); + return _feed_handler.get(); } } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h index f1978dfe0bc..db23cd986e0 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h @@ -13,7 +13,12 @@ class ConfigSet; } -namespace document { class DocumentTypeRepo; } +namespace document { + +class DocumentTypeRepo; +class FieldSetRepo; + +} namespace document::internal { class InternalDocumenttypesType; } namespace mbus { class Slobrok; } namespace storage::rpc { class SharedRpcResources; } @@ -23,6 +28,7 @@ namespace search::bmcluster { class BmFeed; class BmMessageBus; class BmNode; +class IBmDistribution; class IBmFeedHandler; /* @@ -46,7 +52,10 @@ class BmCluster { int _base_port; std::shared_ptr _document_types; std::shared_ptr _repo; + std::unique_ptr _field_set_repo; + std::shared_ptr _distribution; std::vector> _nodes; + std::unique_ptr _feed_handler; public: BmCluster(const vespalib::string& base_dir, int base_port, const BmClusterParams& params, std::shared_ptr document_types, std::shared_ptr repo); @@ -60,8 +69,8 @@ public: void stop_rpc_client(); void start_service_layers(); void start_distributors(); - void create_feed_handlers(); - void shutdown_feed_handlers(); + void create_feed_handler(); + void shutdown_feed_handler(); void shutdown_distributors(); void shutdown_service_layers(); void create_buckets(BmFeed &feed); @@ -70,9 +79,9 @@ public: void stop(); storage::rpc::SharedRpcResources &get_rpc_client() { return *_rpc_client; } BmMessageBus& get_message_bus() { return *_message_bus; } + const IBmDistribution& get_distribution() { return *_distribution; } void make_node(uint32_t node_idx); void make_nodes(); - BmNode& get_node(uint32_t node_idx); IBmFeedHandler* get_feed_handler(); }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp index e4e85e0273f..38ef1b45254 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bm_cluster_controller.h" +#include "i_bm_distribution.h" #include #include #include @@ -8,7 +9,6 @@ #include #include #include -#include using storage::api::StorageMessageAddress; using storage::rpc::SharedRpcResources; @@ -19,11 +19,9 @@ namespace search::bmcluster { namespace { FRT_RPCRequest * -make_set_cluster_state_request(uint32_t num_nodes) +make_set_cluster_state_request(const IBmDistribution& distribution) { - vespalib::asciistream s; - s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes; - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str())); + storage::lib::ClusterStateBundle bundle(distribution.get_cluster_state_bundle()); storage::rpc::SlimeClusterStateBundleCodec codec; auto encoded_bundle = codec.encode(bundle); auto *req = new FRT_RPCRequest(); @@ -37,9 +35,9 @@ make_set_cluster_state_request(uint32_t num_nodes) } -BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, uint32_t num_nodes) +BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, const IBmDistribution &distribution) : _shared_rpc_resources(shared_rpc_resources_in), - _num_nodes(num_nodes) + _distribution(distribution) { } @@ -48,7 +46,7 @@ BmClusterController::set_cluster_up(uint32_t node_idx, bool distributor) { static vespalib::string _storage("storage"); StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx); - auto req = make_set_cluster_state_request(_num_nodes); + auto req = make_set_cluster_state_request(_distribution); auto target_resolver = std::make_unique(_shared_rpc_resources.slobrok_mirror(), _shared_rpc_resources.target_factory(), 1); uint64_t fake_bucket_id = 0; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h index d3e261ab72f..3e239787b02 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h @@ -9,15 +9,17 @@ namespace storage::rpc { class SharedRpcResources; } namespace search::bmcluster { +class IBmDistribution; + /* * Fake cluster controller that sets cluster state to be up. */ class BmClusterController { storage::rpc::SharedRpcResources& _shared_rpc_resources; - uint32_t _num_nodes; + const IBmDistribution& _distribution; public: - BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, uint32_t num_nodes); + BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, const IBmDistribution& distribution); void set_cluster_up(uint32_t node_idx, bool distributor); }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp new file mode 100644 index 00000000000..44b253b3d35 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp @@ -0,0 +1,98 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_distribution.h" +#include +#include +#include + +using storage::lib::ClusterStateBundle; + +namespace search::bmcluster { + +using DistributionConfigBuilder = BmDistribution::DistributionConfigBuilder; + +namespace { + +BmDistribution::DistributionConfig +make_distribution_config(uint32_t num_nodes) +{ + DistributionConfigBuilder dc; + { + DistributionConfigBuilder::Group group; + { + for (uint32_t i = 0; i < num_nodes; ++i) { + DistributionConfigBuilder::Group::Nodes node; + node.index = i; + group.nodes.push_back(std::move(node)); + } + } + group.index = "invalid"; + group.name = "invalid"; + group.capacity = 1.0; + group.partitions = ""; + dc.group.push_back(std::move(group)); + } + dc.redundancy = 1; + dc.readyCopies = 1; + return dc; +} + +ClusterStateBundle +make_cluster_state_bundle(uint32_t num_nodes) +{ + vespalib::asciistream s; + s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes; + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str())); + return bundle; +} + +} + +BmDistribution::BmDistribution(uint32_t num_nodes) + : _num_nodes(num_nodes), + _distribution_config(make_distribution_config(num_nodes)), + _distribution(_distribution_config), + _cluster_state_bundle(make_cluster_state_bundle(num_nodes)) +{ +} + +BmDistribution::~BmDistribution() +{ +} + +uint32_t +BmDistribution::get_num_nodes() const +{ + return _num_nodes; + +} + +uint32_t +BmDistribution::get_service_layer_node_idx(const document::Bucket& bucket) const +{ + auto cluster_state = _cluster_state_bundle.getDerivedClusterState(bucket.getBucketSpace()); + auto nodes = _distribution.getIdealStorageNodes(*cluster_state, bucket.getBucketId()); + assert(!nodes.empty()); + return nodes[0]; +} + +uint32_t +BmDistribution::get_distributor_node_idx(const document::Bucket& bucket) const +{ + auto cluster_state = _cluster_state_bundle.getDerivedClusterState(bucket.getBucketSpace()); + return _distribution.getIdealDistributorNode(*cluster_state, bucket.getBucketId()); +} + +BmDistribution::DistributionConfig +BmDistribution::get_distribution_config() const +{ + return _distribution_config; +} + +ClusterStateBundle +BmDistribution::get_cluster_state_bundle() const +{ + return _cluster_state_bundle; +} + +}; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h new file mode 100644 index 00000000000..fde89a0d766 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_distribution.h" +#include +#include +#include +#include + +namespace search::bmcluster { + +/* + * Class for describing cluster toplogy and how messages are + * routed from feeders into the cluster. + */ +class BmDistribution : public IBmDistribution +{ + uint32_t _num_nodes; + DistributionConfigBuilder _distribution_config; + storage::lib::Distribution _distribution; + storage::lib::ClusterStateBundle _cluster_state_bundle; + +public: + BmDistribution(uint32_t num_nodes); + ~BmDistribution() override; + uint32_t get_num_nodes() const override; + uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const override; + uint32_t get_distributor_node_idx(const document::Bucket & bucket) const override; + DistributionConfig get_distribution_config() const override; + storage::lib::ClusterStateBundle get_cluster_state_bundle() const override; +}; + +}; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp new file mode 100644 index 00000000000..95131118368 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_message_bus_routes.h" +#include + +namespace search::bmcluster { + +BmMessageBusRoutes::BmMessageBusRoutes(uint32_t num_nodes, bool distributor) + : BmStorageMessageAddresses(num_nodes, distributor), + _routes(num_nodes) +{ + for (uint32_t node_idx = 0; node_idx < num_nodes; ++node_idx) { + _routes[node_idx] = get_address(node_idx).to_mbus_route(); + } +} + +BmMessageBusRoutes::~BmMessageBusRoutes() = default; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h new file mode 100644 index 00000000000..95116f71345 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h @@ -0,0 +1,24 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bm_storage_message_addresses.h" +#include + +namespace search::bmcluster { + +/* + * Class containing the message bus routes for a set of nodes at + * the given layer (service layer or ditributor). + */ +class BmMessageBusRoutes : public BmStorageMessageAddresses +{ + std::vector _routes; +public: + BmMessageBusRoutes(uint32_t num_nodes, bool distributor); + ~BmMessageBusRoutes(); + const mbus::Route& get_route(uint32_t node_idx) const { return _routes[node_idx]; } + bool has_route(uint32_t node_idx) const { return node_idx < _routes.size(); } +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index bc407321937..bc84812e669 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -10,6 +10,7 @@ #include "storage_api_message_bus_bm_feed_handler.h" #include "storage_api_rpc_bm_feed_handler.h" #include "document_api_message_bus_bm_feed_handler.h" +#include "i_bm_distribution.h" #include "i_bm_feed_handler.h" #include "spi_bm_feed_handler.h" #include @@ -26,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -279,7 +279,7 @@ struct StorageConfigSet SlobroksConfigBuilder slobroks; MessagebusConfigBuilder messagebus; - StorageConfigSet(const vespalib::string &base_dir, uint32_t node_idx, bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, + StorageConfigSet(const vespalib::string &base_dir, uint32_t node_idx, bool distributor, const vespalib::string& config_id_in, const IBmDistribution& distribution, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params) : config_id(config_id_in), documenttypes(documenttypes_in), @@ -296,26 +296,7 @@ struct StorageConfigSet slobroks(), messagebus() { - { - auto& dc = stor_distribution; - { - StorDistributionConfigBuilder::Group group; - { - for (uint32_t i = 0; i < params.get_num_nodes(); ++i) { - StorDistributionConfigBuilder::Group::Nodes node; - node.index = i; - group.nodes.push_back(std::move(node)); - } - } - group.index = "invalid"; - group.name = "invalid"; - group.capacity = 1.0; - group.partitions = ""; - dc.group.push_back(std::move(group)); - } - dc.redundancy = 1; - dc.readyCopies = 1; - } + stor_distribution = distribution.get_distribution_config(); stor_server.nodeIndex = node_idx; stor_server.isDistributor = distributor; stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits(); @@ -364,9 +345,9 @@ struct ServiceLayerConfigSet : public StorageConfigSet StorBucketInitConfigBuilder stor_bucket_init; StorVisitorConfigBuilder stor_visitor; - ServiceLayerConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, + ServiceLayerConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const IBmDistribution& distribution, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params) - : StorageConfigSet(base_dir, node_idx, false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), + : StorageConfigSet(base_dir, node_idx, false, config_id_in, distribution, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), persistence(), stor_filestor(), stor_bucket_init(), @@ -395,9 +376,9 @@ struct DistributorConfigSet : public StorageConfigSet StorDistributormanagerConfigBuilder stor_distributormanager; StorVisitordispatcherConfigBuilder stor_visitordispatcher; - DistributorConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, + DistributorConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const IBmDistribution& distribution, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params) - : StorageConfigSet(base_dir, node_idx, true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), + : StorageConfigSet(base_dir, node_idx, true, config_id_in, distribution, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params), stor_distributormanager(), stor_visitordispatcher() { @@ -451,12 +432,10 @@ class MyBmNode : public BmNode MyResourceWriteFilter _write_filter; proton::test::DiskMemUsageNotifier _disk_mem_usage_notifier; std::shared_ptr _persistence_engine; - std::unique_ptr _field_set_repo; ServiceLayerConfigSet _service_layer_config; DistributorConfigSet _distributor_config; ConfigSet _config_set; std::shared_ptr _config_context; - std::unique_ptr _feed_handler; std::unique_ptr _slobrok; std::shared_ptr _service_layer_chain_context; std::unique_ptr _service_layer; @@ -472,13 +451,11 @@ public: void start_service_layer(const BmClusterParams& params) override; void wait_service_layer() override; void start_distributor(const BmClusterParams& params) override; - void create_feed_handler(const BmClusterParams& params) override; - void shutdown_feed_handler() override; void shutdown_distributor() override; void shutdown_service_layer() override; void wait_service_layer_slobrok() override; void wait_distributor_slobrok() override; - IBmFeedHandler* get_feed_handler() override; + std::shared_ptr get_storage_link_context(bool distributor) override; PersistenceProvider* get_persistence_provider() override; }; @@ -514,12 +491,10 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _write_filter(), _disk_mem_usage_notifier(), _persistence_engine(), - _field_set_repo(std::make_unique(*_repo)), - _service_layer_config(_base_dir, _node_idx, "bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), - _distributor_config(_base_dir, _node_idx, "bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), + _service_layer_config(_base_dir, _node_idx, "bm-servicelayer", cluster.get_distribution(), *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), + _distributor_config(_base_dir, _node_idx, "bm-distributor", cluster.get_distribution(), *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), _config_set(), _config_context(std::make_shared(_config_set)), - _feed_handler(), _slobrok(), _service_layer_chain_context(), _service_layer(), @@ -532,7 +507,6 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); _service_layer_config.add_builders(_config_set); _distributor_config.add_builders(_config_set); - _feed_handler = std::make_unique(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info()); } MyBmNode::~MyBmNode() @@ -635,42 +609,6 @@ MyBmNode::start_distributor(const BmClusterParams& params) _distributor->createNode(); } -void -MyBmNode::create_feed_handler(const BmClusterParams& params) -{ - StorageApiRpcService::Params rpc_params; - // This is the same compression config as the default in stor-communicationmanager.def. - rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); - rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node(); - if (params.get_use_document_api()) { - _feed_handler = std::make_unique(_cluster.get_message_bus()); - } else if (params.get_enable_distributor()) { - if (params.get_use_storage_chain()) { - assert(_distributor_chain_context); - _feed_handler = std::make_unique(_distributor_chain_context, true); - } else if (params.get_use_message_bus()) { - _feed_handler = std::make_unique(_cluster.get_message_bus(), true); - } else { - _feed_handler = std::make_unique(_cluster.get_rpc_client(), _repo, rpc_params, true); - } - } else if (params.needs_service_layer()) { - if (params.get_use_storage_chain()) { - assert(_service_layer_chain_context); - _feed_handler = std::make_unique(_service_layer_chain_context, false); - } else if (params.get_use_message_bus()) { - _feed_handler = std::make_unique(_cluster.get_message_bus(), false); - } else { - _feed_handler = std::make_unique(_cluster.get_rpc_client(), _repo, rpc_params, false); - } - } -} - -void -MyBmNode::shutdown_feed_handler() -{ - _feed_handler.reset(); -} - void MyBmNode::shutdown_distributor() { @@ -691,10 +629,10 @@ MyBmNode::shutdown_service_layer() } } -IBmFeedHandler* -MyBmNode::get_feed_handler() +std::shared_ptr +MyBmNode::get_storage_link_context(bool distributor) { - return _feed_handler.get(); + return distributor ? _distributor_chain_context : _service_layer_chain_context; } PersistenceProvider* diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h index 2287ac90dec..4bb0d63404a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h @@ -22,7 +22,9 @@ namespace search::bmcluster { class BmCluster; class BmClusterParams; +struct BmStorageLinkContext; class IBmFeedHandler; +class IBMDistribution; /* * Class representing a single benchmark node in a benchmark cluster. @@ -38,13 +40,11 @@ public: virtual void start_service_layer(const BmClusterParams& params) = 0; virtual void wait_service_layer() = 0; virtual void start_distributor(const BmClusterParams& params) = 0; - virtual void create_feed_handler(const BmClusterParams& params) = 0; - virtual void shutdown_feed_handler() = 0; virtual void shutdown_distributor() = 0; virtual void shutdown_service_layer() = 0; virtual void wait_service_layer_slobrok() = 0; virtual void wait_distributor_slobrok() = 0; - virtual IBmFeedHandler* get_feed_handler() = 0; + virtual std::shared_ptr get_storage_link_context(bool distributor) = 0; virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0; static unsigned int num_ports(); static std::unique_ptr create(const vespalib::string &base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr document_types, int slobrok_port); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp new file mode 100644 index 00000000000..45145a6f65c --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_storage_message_addresses.h" +#include + +using storage::api::StorageMessageAddress; +using storage::lib::NodeType; + +namespace search::bmcluster { + +namespace { + +vespalib::string _Storage("storage"); + +} + +BmStorageMessageAddresses::BmStorageMessageAddresses(uint32_t num_nodes, bool distributor) + : _addresses(num_nodes) +{ + for (uint32_t node_idx = 0; node_idx < num_nodes; ++node_idx) { + _addresses[node_idx] = std::make_unique(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx); + } +} + +BmStorageMessageAddresses::~BmStorageMessageAddresses() = default; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h new file mode 100644 index 00000000000..93b0ed56168 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h @@ -0,0 +1,28 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include + +namespace storage::api { class StorageMessageAddress; } + +namespace search::bmcluster { + +/* + * Class representing the storage message addresses for a set of nodes at + * the given layer (service layer or ditributor). + */ +class BmStorageMessageAddresses +{ + using StorageMessageAddress = storage::api::StorageMessageAddress; + std::vector> _addresses; +public: + BmStorageMessageAddresses(uint32_t num_nodes, bool distributor); + ~BmStorageMessageAddresses(); + const StorageMessageAddress &get_address(uint32_t node_idx) const { return *_addresses[node_idx]; } + bool has_address(uint32_t node_idx) const { return node_idx < _addresses.size(); } + +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp index 6670707ed39..4de1f21b86f 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp @@ -5,10 +5,9 @@ namespace search::bmcluster { -BucketInfoQueue::BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic& errors) +BucketInfoQueue::BucketInfoQueue(std::atomic& errors) : _mutex(), - _buckets(), - _provider(provider), + _pending_get_bucket_infos(), _errors(errors) { } @@ -22,11 +21,11 @@ void BucketInfoQueue::get_bucket_info_loop() { std::unique_lock guard(_mutex); - while (!_buckets.empty()) { - auto bucket = _buckets.front(); - _buckets.pop_front(); + while (!_pending_get_bucket_infos.empty()) { + auto pending_get_bucket_info = _pending_get_bucket_infos.front(); + _pending_get_bucket_infos.pop_front(); guard.unlock(); - auto bucket_info = _provider.getBucketInfo(bucket); + auto bucket_info = pending_get_bucket_info.second->getBucketInfo(pending_get_bucket_info.first); if (bucket_info.hasError()) { ++_errors; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h index 1a48f9fa478..0cc66a54fed 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h @@ -18,19 +18,19 @@ namespace search::bmcluster { */ class BucketInfoQueue { + using PendingGetBucketInfo = std::pair; std::mutex _mutex; - std::deque _buckets; - storage::spi::PersistenceProvider& _provider; + std::deque _pending_get_bucket_infos; std::atomic& _errors; public: - BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic& errors); + BucketInfoQueue(std::atomic& errors); ~BucketInfoQueue(); - void put_bucket(storage::spi::Bucket bucket) + void put_bucket(storage::spi::Bucket bucket, storage::spi::PersistenceProvider* provider) { std::lock_guard guard(_mutex); - _buckets.emplace_back(std::move(bucket)); + _pending_get_bucket_infos.emplace_back(std::make_pair(std::move(bucket), provider)); } void get_bucket_info_loop(); diff --git a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp index c6f2626f27c..8e5c65ac5b6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp @@ -2,6 +2,7 @@ #include "document_api_message_bus_bm_feed_handler.h" #include "bm_message_bus.h" +#include "i_bm_distribution.h" #include "pending_tracker.h" #include #include @@ -19,53 +20,56 @@ using storage::lib::NodeType; namespace search::bmcluster { -namespace { - vespalib::string _Storage("storage"); -} - -DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus) +DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution) : IBmFeedHandler(), _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")), - _storage_address(std::make_unique(&_Storage, NodeType::DISTRIBUTOR, 0)), _message_bus(message_bus), - _route(_storage_address->to_mbus_route()) + _routes(distribution.get_num_nodes(), true), + _no_route_error_count(0u), + _distribution(distribution) { } DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default; void -DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr msg, PendingTracker& pending_tracker) +DocumentApiMessageBusBmFeedHandler::send_msg(const document::Bucket& bucket, std::unique_ptr msg, PendingTracker& pending_tracker) { - _message_bus.send_msg(std::move(msg), _route, pending_tracker); + uint32_t node_idx = _distribution.get_distributor_node_idx(bucket); + if (_routes.has_route(node_idx)) { + auto &route = _routes.get_route(node_idx); + _message_bus.send_msg(std::move(msg), route, pending_tracker); + } else { + ++_no_route_error_count; + } } void -DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr document, uint64_t, PendingTracker& tracker) +DocumentApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr document, uint64_t, PendingTracker& tracker) { auto msg = std::make_unique(std::move(document)); - send_msg(std::move(msg), tracker); + send_msg(bucket, std::move(msg), tracker); } void -DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr document_update, uint64_t, PendingTracker& tracker) +DocumentApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t, PendingTracker& tracker) { auto msg = std::make_unique(std::move(document_update)); - send_msg(std::move(msg), tracker); + send_msg(bucket, std::move(msg), tracker); } void -DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker) +DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t, PendingTracker& tracker) { auto msg = std::make_unique(document_id); - send_msg(std::move(msg), tracker); + send_msg(bucket, std::move(msg), tracker); } void -DocumentApiMessageBusBmFeedHandler::get(const document::Bucket&, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) +DocumentApiMessageBusBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) { auto msg = std::make_unique(document_id, field_set_string); - send_msg(std::move(msg), tracker); + send_msg(bucket, std::move(msg), tracker); } void @@ -76,7 +80,7 @@ DocumentApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&) uint32_t DocumentApiMessageBusBmFeedHandler::get_error_count() const { - return _message_bus.get_error_count(); + return _message_bus.get_error_count() + _no_route_error_count; } const vespalib::string& diff --git a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h index 5358e0a948b..907c18a309a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h @@ -3,15 +3,15 @@ #pragma once #include "i_bm_feed_handler.h" -#include +#include "bm_message_bus_routes.h" +#include -namespace document { class DocumentTypeRepo; } namespace documentapi { class DocumentMessage; }; -namespace storage::api { class StorageMessageAddress; } namespace search::bmcluster { class BmMessageBus; +class IBmDistribution; /* * Benchmark feed handler for feed to distributor using document api protocol @@ -19,13 +19,14 @@ class BmMessageBus; */ class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler { - vespalib::string _name; - std::unique_ptr _storage_address; - BmMessageBus& _message_bus; - mbus::Route _route; - void send_msg(std::unique_ptr msg, PendingTracker& tracker); + vespalib::string _name; + BmMessageBus& _message_bus; + BmMessageBusRoutes _routes; + std::atomic _no_route_error_count; + const IBmDistribution& _distribution; + void send_msg(const document::Bucket& bucket, std::unique_ptr msg, PendingTracker& tracker); public: - DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus); + DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution); ~DocumentApiMessageBusBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t timestamp, PendingTracker& tracker) override; diff --git a/searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h new file mode 100644 index 00000000000..f2c18767d1d --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include + +namespace document { class Bucket; } + +namespace vespa::config::content::internal { + class InternalStorDistributionType; +} + +namespace storage::lib { class ClusterStateBundle; } + +namespace search::bmcluster { + +/* + * Interface class for describing cluster toplogy and how messages are + * routed from feeders into the cluster. + */ +class IBmDistribution { +public: + using DistributionConfigBuilder = vespa::config::content::internal::InternalStorDistributionType; + using DistributionConfig = const DistributionConfigBuilder; + + virtual ~IBmDistribution() = default; + virtual uint32_t get_num_nodes() const = 0; + virtual uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const = 0; + virtual uint32_t get_distributor_node_idx(const document::Bucket & bucket) const = 0; + virtual DistributionConfig get_distribution_config() const = 0; + virtual storage::lib::ClusterStateBundle get_cluster_state_bundle() const = 0; +}; + +}; diff --git a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp index 247bf8bece3..bb19bcaefdf 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp @@ -47,9 +47,9 @@ PendingTracker::drain() } void -PendingTracker::attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic& errors) +PendingTracker::attach_bucket_info_queue(std::atomic& errors) { - _bucket_info_queue = std::make_unique(provider, errors); + _bucket_info_queue = std::make_unique(errors); } } diff --git a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h index a8fa2f77396..cb8dc823b9b 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h +++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h @@ -5,8 +5,6 @@ #include #include -namespace storage::spi { struct PersistenceProvider; } - namespace search::bmcluster { class BucketInfoQueue; @@ -30,7 +28,7 @@ public: void retain(); void drain(); - void attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic& errors); + void attach_bucket_info_queue(std::atomic& errors); BucketInfoQueue *get_bucket_info_queue() { return _bucket_info_queue.get(); } }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp index e905b493cf4..e0822db6f0c 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "spi_bm_feed_handler.h" +#include "i_bm_distribution.h" #include "pending_tracker.h" #include "bucket_info_queue.h" #include @@ -31,18 +32,20 @@ void get_bucket_info_loop(PendingTracker &tracker) class MyOperationComplete : public storage::spi::OperationComplete { + PersistenceProvider* _provider; std::atomic &_errors; Bucket _bucket; PendingTracker& _tracker; public: - MyOperationComplete(std::atomic &errors, const Bucket& bucket, PendingTracker& tracker); + MyOperationComplete(PersistenceProvider* provider, std::atomic &errors, const Bucket& bucket, PendingTracker& tracker); ~MyOperationComplete() override; void onComplete(std::unique_ptr result) override; void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; }; -MyOperationComplete::MyOperationComplete(std::atomic &errors, const Bucket& bucket, PendingTracker& tracker) - : _errors(errors), +MyOperationComplete::MyOperationComplete(PersistenceProvider* provider, std::atomic &errors, const Bucket& bucket, PendingTracker& tracker) + : _provider(provider), + _errors(errors), _bucket(bucket), _tracker(tracker) { @@ -62,7 +65,7 @@ MyOperationComplete::onComplete(std::unique_ptr result) } else { auto bucket_info_queue = _tracker.get_bucket_info_queue(); if (bucket_info_queue != nullptr) { - bucket_info_queue->put_bucket(_bucket); + bucket_info_queue->put_bucket(_bucket, _provider); } } } @@ -75,50 +78,81 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result } -SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider, const document::FieldSetRepo &field_set_repo, bool skip_get_spi_bucket_info) +SpiBmFeedHandler::SpiBmFeedHandler(std::vectorproviders, const document::FieldSetRepo &field_set_repo, const IBmDistribution& distribution, bool skip_get_spi_bucket_info) : IBmFeedHandler(), _name(vespalib::string("SpiBmFeedHandler(") + (skip_get_spi_bucket_info ? "skip-get-spi-bucket-info" : "get-spi-bucket-info") + ")"), - _provider(provider), + _providers(std::move(providers)), _field_set_repo(field_set_repo), _errors(0u), - _skip_get_spi_bucket_info(skip_get_spi_bucket_info) + _skip_get_spi_bucket_info(skip_get_spi_bucket_info), + _distribution(distribution) { } SpiBmFeedHandler::~SpiBmFeedHandler() = default; +PersistenceProvider* +SpiBmFeedHandler::get_provider(const document::Bucket& bucket) +{ + uint32_t node_idx = _distribution.get_service_layer_node_idx(bucket); + if (node_idx >= _providers.size()) { + return nullptr; + } + return _providers[node_idx]; +} + void SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique(_errors, spi_bucket, tracker)); + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + provider->putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique(provider, _errors, spi_bucket, tracker)); + } else { + ++_errors; + } } void SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t timestamp, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique(_errors, spi_bucket, tracker)); + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + provider->updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique(provider, _errors, spi_bucket, tracker)); + } else { + ++_errors; + } } void SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - _provider.removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique(_errors, spi_bucket, tracker)); + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + provider->removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique(provider, _errors, spi_bucket, tracker)); + } else { + ++_errors; + } } void SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) { get_bucket_info_loop(tracker); - Bucket spi_bucket(bucket); - auto field_set = _field_set_repo.getFieldSet(field_set_string); - auto result = _provider.get(spi_bucket, *field_set, document_id, context); - if (result.hasError()) { + auto provider = get_provider(bucket); + if (provider) { + Bucket spi_bucket(bucket); + auto field_set = _field_set_repo.getFieldSet(field_set_string); + auto result = provider->get(spi_bucket, *field_set, document_id, context); + if (result.hasError()) { + ++_errors; + } + } else { ++_errors; } } @@ -127,7 +161,7 @@ void SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker) { if (!_skip_get_spi_bucket_info) { - tracker.attach_bucket_info_queue(_provider, _errors); + tracker.attach_bucket_info_queue(_errors); } } diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h index bbc9e3b8e74..07ef0a21e5b 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h @@ -4,24 +4,30 @@ #include "i_bm_feed_handler.h" #include +#include namespace document { class FieldSetRepo; } namespace storage::spi { struct PersistenceProvider; } namespace search::bmcluster { +class IBmDistribution; + /* * Benchmark feed handler for feed directly to persistence provider */ class SpiBmFeedHandler : public IBmFeedHandler { - vespalib::string _name; - storage::spi::PersistenceProvider& _provider; - const document::FieldSetRepo& _field_set_repo; - std::atomic _errors; - bool _skip_get_spi_bucket_info; + vespalib::string _name; + std::vector _providers; + const document::FieldSetRepo& _field_set_repo; + std::atomic _errors; + bool _skip_get_spi_bucket_info; + const IBmDistribution& _distribution; + + storage::spi::PersistenceProvider* get_provider(const document::Bucket &bucket); public: - SpiBmFeedHandler(storage::spi::PersistenceProvider& provider, const document::FieldSetRepo& field_set_repo, bool skip_get_spi_bucket_info); + SpiBmFeedHandler(std::vector providers, const document::FieldSetRepo& field_set_repo, const IBmDistribution& distribution, bool skip_get_spi_bucket_info); ~SpiBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t timestamp, PendingTracker& tracker) override; diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp index 34669b8cbdc..f7df593a75e 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storage_api_chain_bm_feed_handler.h" +#include "i_bm_distribution.h" #include "pending_tracker.h" #include "storage_reply_error_checker.h" #include "bm_storage_link_context.h" @@ -20,27 +21,14 @@ using document::DocumentUpdate; namespace search::bmcluster { -namespace { - -std::shared_ptr make_set_cluster_state_cmd() { - storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); - auto cmd = std::make_shared(bundle); - cmd->setPriority(storage::api::StorageMessage::VERYHIGH); - return cmd; -} - -} - -StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr context, bool distributor) +StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::vector> contexts, const IBmDistribution& distribution, bool distributor) : IBmFeedHandler(), _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), - _context(std::move(context)) + _contexts(std::move(contexts)), + _no_link_error_count(0u), + _distribution(distribution) { - auto cmd = make_set_cluster_state_cmd(); - PendingTracker tracker(1); - send_msg(std::move(cmd), tracker); - tracker.drain(); } StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default; @@ -48,10 +36,20 @@ StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default; void StorageApiChainBmFeedHandler::send_msg(std::shared_ptr cmd, PendingTracker& pending_tracker) { - cmd->setSourceIndex(0); - auto bm_link = _context->bm_link; - bm_link->retain(cmd->getMsgId(), pending_tracker); - bm_link->sendDown(std::move(cmd)); + auto bucket = cmd->getBucket(); + if (_distributor) { + cmd->setSourceIndex(0); + } else { + cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket)); + } + uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); + if (node_idx < _contexts.size() && _contexts[node_idx]) { + auto bm_link = _contexts[node_idx]->bm_link; + bm_link->retain(cmd->getMsgId(), pending_tracker); + bm_link->sendDown(std::move(cmd)); + } else { + ++_no_link_error_count; + } } void @@ -90,7 +88,14 @@ StorageApiChainBmFeedHandler::attach_bucket_info_queue(PendingTracker&) uint32_t StorageApiChainBmFeedHandler::get_error_count() const { - return _context->bm_link->get_error_count(); + uint32_t error_count = 0; + for (auto &context : _contexts) { + if (context) { + error_count += context->bm_link->get_error_count(); + } + } + error_count += _no_link_error_count; + return error_count; } const vespalib::string& diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h index 1c196d746eb..d4ba06403e3 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h @@ -3,12 +3,15 @@ #pragma once #include "i_bm_feed_handler.h" +#include +#include namespace storage::api { class StorageCommand; } namespace search::bmcluster { struct BmStorageLinkContext; +class IBmDistribution; /* * Benchmark feed handler for feed to service layer or distributor @@ -17,12 +20,15 @@ struct BmStorageLinkContext; class StorageApiChainBmFeedHandler : public IBmFeedHandler { private: - vespalib::string _name; - bool _distributor; - std::shared_ptr _context; + vespalib::string _name; + bool _distributor; + std::vector> _contexts; + std::atomic _no_link_error_count; + const IBmDistribution& _distribution; + void send_msg(std::shared_ptr cmd, PendingTracker& tracker); public: - StorageApiChainBmFeedHandler(std::shared_ptr context, bool distributor); + StorageApiChainBmFeedHandler(std::vector> contexts, const IBmDistribution& distribution, bool distributor); ~StorageApiChainBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t timestamp, PendingTracker& tracker) override; diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp index 04561b5d93e..4f84be508db 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp @@ -2,6 +2,7 @@ #include "storage_api_message_bus_bm_feed_handler.h" #include "bm_message_bus.h" +#include "i_bm_distribution.h" #include "pending_tracker.h" #include #include @@ -17,16 +18,14 @@ using storage::lib::NodeType; namespace search::bmcluster { -namespace { - vespalib::string _Storage("storage"); -} -StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor) +StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor) : IBmFeedHandler(), _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), - _storage_address(std::make_unique(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), _message_bus(message_bus), - _route(_storage_address->to_mbus_route()) + _routes(distribution.get_num_nodes(), distributor), + _no_route_error_count(0), + _distribution(distribution) { } @@ -35,9 +34,20 @@ StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = defaul void StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr cmd, PendingTracker& pending_tracker) { - cmd->setSourceIndex(0); + auto bucket = cmd->getBucket(); + if (_distributor) { + cmd->setSourceIndex(0); + } else { + cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket)); + } auto msg = std::make_unique(cmd); - _message_bus.send_msg(std::move(msg), _route, pending_tracker); + uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); + if (_routes.has_route(node_idx)) { + auto& route = _routes.get_route(node_idx); + _message_bus.send_msg(std::move(msg), route, pending_tracker); + } else { + ++_no_route_error_count; + } } void @@ -76,7 +86,7 @@ StorageApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&) uint32_t StorageApiMessageBusBmFeedHandler::get_error_count() const { - return _message_bus.get_error_count(); + return _message_bus.get_error_count() + _no_route_error_count; } const vespalib::string& diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h index 0027f260b8f..c02e594964a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h @@ -3,18 +3,15 @@ #pragma once #include "i_bm_feed_handler.h" -#include +#include "bm_message_bus_routes.h" +#include -namespace document { class DocumentTypeRepo; } -namespace documentapi { class DocumentMessage; }; -namespace storage::api { -class StorageCommand; -class StorageMessageAddress; -} +namespace storage::api { class StorageCommand; } namespace search::bmcluster { class BmMessageBus; +class IBmDistribution; /* * Benchmark feed handler for feed to service layer or distributor @@ -22,14 +19,15 @@ class BmMessageBus; */ class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler { - vespalib::string _name; - bool _distributor; - std::unique_ptr _storage_address; - BmMessageBus& _message_bus; - mbus::Route _route; + vespalib::string _name; + bool _distributor; + BmMessageBus& _message_bus; + BmMessageBusRoutes _routes; + std::atomic _no_route_error_count; + const IBmDistribution& _distribution; void send_msg(std::shared_ptr cmd, PendingTracker& tracker); public: - StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor); + StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor); ~StorageApiMessageBusBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t timestamp, PendingTracker& tracker) override; diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp index 3e0426cb308..57d50853c71 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storage_api_rpc_bm_feed_handler.h" +#include "i_bm_distribution.h" #include "pending_tracker.h" #include "pending_tracker_hash.h" #include "storage_reply_error_checker.h" @@ -24,10 +25,6 @@ using storage::lib::NodeType; namespace search::bmcluster { -namespace { - vespalib::string _Storage("storage"); -} - class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher, public StorageReplyErrorChecker { @@ -66,15 +63,18 @@ StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr repo, const StorageApiRpcService::Params& rpc_params, + const IBmDistribution& distribution, bool distributor) : IBmFeedHandler(), _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), - _storage_address(std::make_unique(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), + _addresses(distribution.get_num_nodes(), distributor), + _no_address_error_count(0u), _shared_rpc_resources(shared_rpc_resources_in), _message_dispatcher(std::make_unique()), _message_codec_provider(std::make_unique(repo)), - _rpc_client(std::make_unique(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)) + _rpc_client(std::make_unique(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)), + _distribution(distribution) { } @@ -83,10 +83,20 @@ StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; void StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr cmd, PendingTracker& pending_tracker) { - cmd->setSourceIndex(0); - cmd->setAddress(*_storage_address); - _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); - _rpc_client->send_rpc_v1_request(std::move(cmd)); + auto bucket = cmd->getBucket(); + if (_distributor) { + cmd->setSourceIndex(0); + } else { + cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket)); + } + uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); + if (_addresses.has_address(node_idx)) { + cmd->setAddress(_addresses.get_address(node_idx)); + _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); + _rpc_client->send_rpc_v1_request(std::move(cmd)); + } else { + ++_no_address_error_count; + } } void @@ -125,7 +135,7 @@ StorageApiRpcBmFeedHandler::attach_bucket_info_queue(PendingTracker&) uint32_t StorageApiRpcBmFeedHandler::get_error_count() const { - return _message_dispatcher->get_error_count(); + return _message_dispatcher->get_error_count() + _no_address_error_count; } const vespalib::string& diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h index 360f702e590..48b6233d0a4 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include "i_bm_feed_handler.h" +#include "bm_storage_message_addresses.h" #include namespace document { class DocumentTypeRepo; } @@ -18,6 +19,8 @@ class SharedRpcResources; namespace search::bmcluster { +class IBmDistribution; + /* * Benchmark feed handler for feed to service layer or distributor * using storage api protocol over rpc. @@ -27,17 +30,20 @@ class StorageApiRpcBmFeedHandler : public IBmFeedHandler class MyMessageDispatcher; vespalib::string _name; bool _distributor; - std::unique_ptr _storage_address; - storage::rpc::SharedRpcResources& _shared_rpc_resources; - std::unique_ptr _message_dispatcher; + BmStorageMessageAddresses _addresses; + std::atomic _no_address_error_count; + storage::rpc::SharedRpcResources& _shared_rpc_resources; + std::unique_ptr _message_dispatcher; std::unique_ptr _message_codec_provider; std::unique_ptr _rpc_client; + const IBmDistribution& _distribution; void send_rpc(std::shared_ptr cmd, PendingTracker& tracker); public: StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr repo, const storage::rpc::StorageApiRpcService::Params& rpc_params, + const IBmDistribution& distribution, bool distributor); ~StorageApiRpcBmFeedHandler(); void put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) override; diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h index 2fcb6aad14a..62a7190b14a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h @@ -8,6 +8,9 @@ namespace storage::api { class StorageMessage; } namespace search::bmcluster { +/* + * Class tracking errors in storage replies. + */ class StorageReplyErrorChecker { protected: std::atomic _errors; -- cgit v1.2.3