aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-14 13:05:56 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-14 13:05:56 +0200
commitfc89590cc955e6ef2cde7cb44c833a561b14806b (patch)
tree4bc2bb04ea9ce9f2daf4771a7cf7743c13852793
parentda7bfb94df3cfc2975513f205ebfed62415a03e4 (diff)
Move ownership of feed handlers from BmNode to BmCluster.
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp110
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h17
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h6
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp98
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h34
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h24
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp88
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.h6
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h28
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h10
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h19
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h34
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp70
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h18
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp49
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h14
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h24
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h3
29 files changed, 599 insertions, 249 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
index 8d6dbd0f938..44e13c5954e 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -4,11 +4,14 @@ vespa_add_library(searchcore_bmcluster STATIC
bm_cluster.cpp
bm_cluster_controller.cpp
bm_cluster_params.cpp
+ bm_distribution.cpp
bm_feed.cpp
bm_message_bus.cpp
+ bm_message_bus_routes.cpp
bm_node.cpp
bm_storage_chain_builder.cpp
bm_storage_link.cpp
+ bm_storage_message_addresses.cpp
bucket_info_queue.cpp
document_api_message_bus_bm_feed_handler.cpp
pending_tracker.cpp
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
index 23f3f6d1e30..b7886a08325 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
@@ -2,11 +2,17 @@
#include "bm_cluster.h"
#include "bm_cluster_controller.h"
+#include "bm_distribution.h"
#include "bm_feed.h"
#include "bm_message_bus.h"
#include "bm_node.h"
+#include "document_api_message_bus_bm_feed_handler.h"
#include "spi_bm_feed_handler.h"
+#include "storage_api_chain_bm_feed_handler.h"
+#include "storage_api_message_bus_bm_feed_handler.h"
+#include "storage_api_rpc_bm_feed_handler.h"
#include <vespa/config/common/configcontext.h>
+#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/messagebus/config-messagebus.h>
#include <vespa/messagebus/testlib/slobrok.h>
@@ -23,6 +29,9 @@ using cloud::config::SlobroksConfigBuilder;
using config::ConfigSet;
using messagebus::MessagebusConfigBuilder;
using storage::rpc::SharedRpcResources;
+using storage::rpc::StorageApiRpcService;
+using storage::spi::PersistenceProvider;
+using vespalib::compression::CompressionConfig;
namespace search::bmcluster {
@@ -51,6 +60,34 @@ make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
slobroks.slobrok.push_back(std::move(slobrok));
}
+std::vector<std::shared_ptr<BmStorageLinkContext>>
+collect_storage_link_contexts(const std::vector<std::unique_ptr<BmNode>> &nodes, bool distributor)
+{
+ std::vector<std::shared_ptr<BmStorageLinkContext>> contexts;
+ for (auto& node : nodes) {
+ if (node) {
+ contexts.emplace_back(node->get_storage_link_context(distributor));
+ } else {
+ contexts.emplace_back();
+ }
+ }
+ return contexts;
+}
+
+std::vector<PersistenceProvider *>
+collect_persistence_providers(const std::vector<std::unique_ptr<BmNode>> &nodes)
+{
+ std::vector<PersistenceProvider *> providers;
+ for (auto& node : nodes) {
+ if (node) {
+ providers.emplace_back(node->get_persistence_provider());
+ } else {
+ providers.emplace_back(nullptr);
+ }
+ }
+ return providers;
+}
+
}
struct BmCluster::MessageBusConfigSet {
@@ -109,8 +146,10 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl
_base_port(base_port),
_document_types(std::move(document_types)),
_repo(std::move(repo)),
- _nodes(params.get_num_nodes())
-
+ _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)),
+ _distribution(std::make_shared<const BmDistribution>(params.get_num_nodes())),
+ _nodes(params.get_num_nodes()),
+ _feed_handler()
{
_message_bus_config->add_builders(*_config_set);
_rpc_client_config->add_builders(*_config_set);
@@ -221,14 +260,6 @@ BmCluster::make_nodes()
}
}
-BmNode&
-BmCluster::get_node(uint32_t node_idx)
-{
- assert(node_idx < _nodes.size());
- assert(_nodes[node_idx]);
- return *_nodes[node_idx];
-}
-
void
BmCluster::initialize_providers()
{
@@ -244,9 +275,15 @@ void
BmCluster::create_buckets(BmFeed& feed)
{
LOG(info, "create %u buckets", feed.num_buckets());
- auto& node = get_node(0);
for (unsigned int i = 0; i < feed.num_buckets(); ++i) {
- node.create_bucket(feed.make_bucket(i));
+ auto bucket = feed.make_bucket(i);
+ uint32_t node_idx = _distribution->get_service_layer_node_idx(bucket);
+ if (node_idx < _nodes.size()) {
+ auto& node = _nodes[node_idx];
+ if (node) {
+ node->create_bucket(feed.make_bucket(i));
+ }
+ }
}
}
@@ -270,7 +307,7 @@ BmCluster::start_service_layers()
node->wait_service_layer_slobrok();
}
}
- BmClusterController fake_controller(get_rpc_client(), _params.get_num_nodes());
+ BmClusterController fake_controller(get_rpc_client(), *_distribution);
uint32_t node_idx = 0;
for (const auto &node : _nodes) {
if (node) {
@@ -293,7 +330,7 @@ BmCluster::start_distributors()
node->wait_distributor_slobrok();
}
}
- BmClusterController fake_controller(get_rpc_client(), _params.get_num_nodes());
+ BmClusterController fake_controller(get_rpc_client(), *_distribution);
uint32_t node_idx = 0;
for (const auto &node : _nodes) {
if (node) {
@@ -306,23 +343,42 @@ BmCluster::start_distributors()
}
void
-BmCluster::create_feed_handlers()
+BmCluster::create_feed_handler()
{
- for (const auto &node : _nodes) {
- if (node) {
- node->create_feed_handler(_params);
+ StorageApiRpcService::Params rpc_params;
+ // This is the same compression config as the default in stor-communicationmanager.def.
+ rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
+ rpc_params.num_rpc_targets_per_node = _params.get_rpc_targets_per_node();
+ if (_params.get_use_document_api()) {
+ _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(get_message_bus(), *_distribution);
+ } else if (_params.get_enable_distributor()) {
+ if (_params.get_use_storage_chain()) {
+ auto contexts = collect_storage_link_contexts(_nodes, true);
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(contexts), *_distribution, true);
+ } else if (_params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(get_message_bus(), *_distribution, true);
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(get_rpc_client(), _repo, rpc_params, *_distribution, true);
+ }
+ } else if (_params.needs_service_layer()) {
+ if (_params.get_use_storage_chain()) {
+ auto contexts = collect_storage_link_contexts(_nodes, false);
+ _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(contexts), *_distribution, false);
+ } else if (_params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(get_message_bus(), *_distribution, false);
+ } else {
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(get_rpc_client(), _repo, rpc_params, *_distribution, false);
}
+ } else {
+ auto providers = collect_persistence_providers(_nodes);
+ _feed_handler = std::make_unique<SpiBmFeedHandler>(std::move(providers), *_field_set_repo, *_distribution, _params.get_skip_get_spi_bucket_info());
}
}
void
-BmCluster::shutdown_feed_handlers()
+BmCluster::shutdown_feed_handler()
{
- for (const auto &node : _nodes) {
- if (node) {
- node->shutdown_feed_handler();
- }
- }
+ _feed_handler.reset();
}
void
@@ -363,13 +419,13 @@ BmCluster::start(BmFeed& feed)
if (_params.needs_message_bus()) {
start_message_bus();
}
- create_feed_handlers();
+ create_feed_handler();
}
void
BmCluster::stop()
{
- shutdown_feed_handlers();
+ shutdown_feed_handler();
stop_message_bus();
shutdown_distributors();
shutdown_service_layers();
@@ -378,7 +434,7 @@ BmCluster::stop()
IBmFeedHandler*
BmCluster::get_feed_handler()
{
- return get_node(0).get_feed_handler();
+ return _feed_handler.get();
}
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
index f1978dfe0bc..db23cd986e0 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
@@ -13,7 +13,12 @@ class ConfigSet;
}
-namespace document { class DocumentTypeRepo; }
+namespace document {
+
+class DocumentTypeRepo;
+class FieldSetRepo;
+
+}
namespace document::internal { class InternalDocumenttypesType; }
namespace mbus { class Slobrok; }
namespace storage::rpc { class SharedRpcResources; }
@@ -23,6 +28,7 @@ namespace search::bmcluster {
class BmFeed;
class BmMessageBus;
class BmNode;
+class IBmDistribution;
class IBmFeedHandler;
/*
@@ -46,7 +52,10 @@ class BmCluster {
int _base_port;
std::shared_ptr<DocumenttypesConfig> _document_types;
std::shared_ptr<const document::DocumentTypeRepo> _repo;
+ std::unique_ptr<const document::FieldSetRepo> _field_set_repo;
+ std::shared_ptr<const IBmDistribution> _distribution;
std::vector<std::unique_ptr<BmNode>> _nodes;
+ std::unique_ptr<IBmFeedHandler> _feed_handler;
public:
BmCluster(const vespalib::string& base_dir, int base_port, const BmClusterParams& params, std::shared_ptr<DocumenttypesConfig> document_types, std::shared_ptr<const document::DocumentTypeRepo> repo);
@@ -60,8 +69,8 @@ public:
void stop_rpc_client();
void start_service_layers();
void start_distributors();
- void create_feed_handlers();
- void shutdown_feed_handlers();
+ void create_feed_handler();
+ void shutdown_feed_handler();
void shutdown_distributors();
void shutdown_service_layers();
void create_buckets(BmFeed &feed);
@@ -70,9 +79,9 @@ public:
void stop();
storage::rpc::SharedRpcResources &get_rpc_client() { return *_rpc_client; }
BmMessageBus& get_message_bus() { return *_message_bus; }
+ const IBmDistribution& get_distribution() { return *_distribution; }
void make_node(uint32_t node_idx);
void make_nodes();
- BmNode& get_node(uint32_t node_idx);
IBmFeedHandler* get_feed_handler();
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
index e4e85e0273f..38ef1b45254 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bm_cluster_controller.h"
+#include "i_bm_distribution.h"
#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
@@ -8,7 +9,6 @@
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/slobrok/sbmirror.h>
-#include <vespa/vespalib/stllike/asciistream.h>
using storage::api::StorageMessageAddress;
using storage::rpc::SharedRpcResources;
@@ -19,11 +19,9 @@ namespace search::bmcluster {
namespace {
FRT_RPCRequest *
-make_set_cluster_state_request(uint32_t num_nodes)
+make_set_cluster_state_request(const IBmDistribution& distribution)
{
- vespalib::asciistream s;
- s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes;
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str()));
+ storage::lib::ClusterStateBundle bundle(distribution.get_cluster_state_bundle());
storage::rpc::SlimeClusterStateBundleCodec codec;
auto encoded_bundle = codec.encode(bundle);
auto *req = new FRT_RPCRequest();
@@ -37,9 +35,9 @@ make_set_cluster_state_request(uint32_t num_nodes)
}
-BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, uint32_t num_nodes)
+BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, const IBmDistribution &distribution)
: _shared_rpc_resources(shared_rpc_resources_in),
- _num_nodes(num_nodes)
+ _distribution(distribution)
{
}
@@ -48,7 +46,7 @@ BmClusterController::set_cluster_up(uint32_t node_idx, bool distributor)
{
static vespalib::string _storage("storage");
StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx);
- auto req = make_set_cluster_state_request(_num_nodes);
+ auto req = make_set_cluster_state_request(_distribution);
auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(),
_shared_rpc_resources.target_factory(), 1);
uint64_t fake_bucket_id = 0;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
index d3e261ab72f..3e239787b02 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
@@ -9,15 +9,17 @@ namespace storage::rpc { class SharedRpcResources; }
namespace search::bmcluster {
+class IBmDistribution;
+
/*
* Fake cluster controller that sets cluster state to be up.
*/
class BmClusterController
{
storage::rpc::SharedRpcResources& _shared_rpc_resources;
- uint32_t _num_nodes;
+ const IBmDistribution& _distribution;
public:
- BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, uint32_t num_nodes);
+ BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, const IBmDistribution& distribution);
void set_cluster_up(uint32_t node_idx, bool distributor);
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp
new file mode 100644
index 00000000000..44b253b3d35
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.cpp
@@ -0,0 +1,98 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_distribution.h"
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <cassert>
+
+using storage::lib::ClusterStateBundle;
+
+namespace search::bmcluster {
+
+using DistributionConfigBuilder = BmDistribution::DistributionConfigBuilder;
+
+namespace {
+
+BmDistribution::DistributionConfig
+make_distribution_config(uint32_t num_nodes)
+{
+ DistributionConfigBuilder dc;
+ {
+ DistributionConfigBuilder::Group group;
+ {
+ for (uint32_t i = 0; i < num_nodes; ++i) {
+ DistributionConfigBuilder::Group::Nodes node;
+ node.index = i;
+ group.nodes.push_back(std::move(node));
+ }
+ }
+ group.index = "invalid";
+ group.name = "invalid";
+ group.capacity = 1.0;
+ group.partitions = "";
+ dc.group.push_back(std::move(group));
+ }
+ dc.redundancy = 1;
+ dc.readyCopies = 1;
+ return dc;
+}
+
+ClusterStateBundle
+make_cluster_state_bundle(uint32_t num_nodes)
+{
+ vespalib::asciistream s;
+ s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes;
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str()));
+ return bundle;
+}
+
+}
+
+BmDistribution::BmDistribution(uint32_t num_nodes)
+ : _num_nodes(num_nodes),
+ _distribution_config(make_distribution_config(num_nodes)),
+ _distribution(_distribution_config),
+ _cluster_state_bundle(make_cluster_state_bundle(num_nodes))
+{
+}
+
+BmDistribution::~BmDistribution()
+{
+}
+
+uint32_t
+BmDistribution::get_num_nodes() const
+{
+ return _num_nodes;
+
+}
+
+uint32_t
+BmDistribution::get_service_layer_node_idx(const document::Bucket& bucket) const
+{
+ auto cluster_state = _cluster_state_bundle.getDerivedClusterState(bucket.getBucketSpace());
+ auto nodes = _distribution.getIdealStorageNodes(*cluster_state, bucket.getBucketId());
+ assert(!nodes.empty());
+ return nodes[0];
+}
+
+uint32_t
+BmDistribution::get_distributor_node_idx(const document::Bucket& bucket) const
+{
+ auto cluster_state = _cluster_state_bundle.getDerivedClusterState(bucket.getBucketSpace());
+ return _distribution.getIdealDistributorNode(*cluster_state, bucket.getBucketId());
+}
+
+BmDistribution::DistributionConfig
+BmDistribution::get_distribution_config() const
+{
+ return _distribution_config;
+}
+
+ClusterStateBundle
+BmDistribution::get_cluster_state_bundle() const
+{
+ return _cluster_state_bundle;
+}
+
+};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h
new file mode 100644
index 00000000000..fde89a0d766
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_distribution.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_distribution.h"
+#include <vespa/config-stor-distribution.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
+
+namespace search::bmcluster {
+
+/*
+ * Class for describing cluster toplogy and how messages are
+ * routed from feeders into the cluster.
+ */
+class BmDistribution : public IBmDistribution
+{
+ uint32_t _num_nodes;
+ DistributionConfigBuilder _distribution_config;
+ storage::lib::Distribution _distribution;
+ storage::lib::ClusterStateBundle _cluster_state_bundle;
+
+public:
+ BmDistribution(uint32_t num_nodes);
+ ~BmDistribution() override;
+ uint32_t get_num_nodes() const override;
+ uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const override;
+ uint32_t get_distributor_node_idx(const document::Bucket & bucket) const override;
+ DistributionConfig get_distribution_config() const override;
+ storage::lib::ClusterStateBundle get_cluster_state_bundle() const override;
+};
+
+};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp
new file mode 100644
index 00000000000..95131118368
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.cpp
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_message_bus_routes.h"
+#include <vespa/storageapi/messageapi/storagemessage.h>
+
+namespace search::bmcluster {
+
+BmMessageBusRoutes::BmMessageBusRoutes(uint32_t num_nodes, bool distributor)
+ : BmStorageMessageAddresses(num_nodes, distributor),
+ _routes(num_nodes)
+{
+ for (uint32_t node_idx = 0; node_idx < num_nodes; ++node_idx) {
+ _routes[node_idx] = get_address(node_idx).to_mbus_route();
+ }
+}
+
+BmMessageBusRoutes::~BmMessageBusRoutes() = default;
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h
new file mode 100644
index 00000000000..95116f71345
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus_routes.h
@@ -0,0 +1,24 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bm_storage_message_addresses.h"
+#include <vespa/messagebus/routing/route.h>
+
+namespace search::bmcluster {
+
+/*
+ * Class containing the message bus routes for a set of nodes at
+ * the given layer (service layer or ditributor).
+ */
+class BmMessageBusRoutes : public BmStorageMessageAddresses
+{
+ std::vector<mbus::Route> _routes;
+public:
+ BmMessageBusRoutes(uint32_t num_nodes, bool distributor);
+ ~BmMessageBusRoutes();
+ const mbus::Route& get_route(uint32_t node_idx) const { return _routes[node_idx]; }
+ bool has_route(uint32_t node_idx) const { return node_idx < _routes.size(); }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index bc407321937..bc84812e669 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -10,6 +10,7 @@
#include "storage_api_message_bus_bm_feed_handler.h"
#include "storage_api_rpc_bm_feed_handler.h"
#include "document_api_message_bus_bm_feed_handler.h"
+#include "i_bm_distribution.h"
#include "i_bm_feed_handler.h"
#include "spi_bm_feed_handler.h"
#include <vespa/config-attributes.h>
@@ -26,7 +27,6 @@
#include <vespa/config-upgrading.h>
#include <vespa/config/common/configcontext.h>
#include <vespa/document/bucket/bucketspace.h>
-#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/document/repo/configbuilder.h>
#include <vespa/document/repo/document_type_repo_factory.h>
#include <vespa/document/repo/documenttyperepo.h>
@@ -279,7 +279,7 @@ struct StorageConfigSet
SlobroksConfigBuilder slobroks;
MessagebusConfigBuilder messagebus;
- StorageConfigSet(const vespalib::string &base_dir, uint32_t node_idx, bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ StorageConfigSet(const vespalib::string &base_dir, uint32_t node_idx, bool distributor, const vespalib::string& config_id_in, const IBmDistribution& distribution, const DocumenttypesConfig& documenttypes_in,
int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
: config_id(config_id_in),
documenttypes(documenttypes_in),
@@ -296,26 +296,7 @@ struct StorageConfigSet
slobroks(),
messagebus()
{
- {
- auto& dc = stor_distribution;
- {
- StorDistributionConfigBuilder::Group group;
- {
- for (uint32_t i = 0; i < params.get_num_nodes(); ++i) {
- StorDistributionConfigBuilder::Group::Nodes node;
- node.index = i;
- group.nodes.push_back(std::move(node));
- }
- }
- group.index = "invalid";
- group.name = "invalid";
- group.capacity = 1.0;
- group.partitions = "";
- dc.group.push_back(std::move(group));
- }
- dc.redundancy = 1;
- dc.readyCopies = 1;
- }
+ stor_distribution = distribution.get_distribution_config();
stor_server.nodeIndex = node_idx;
stor_server.isDistributor = distributor;
stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits();
@@ -364,9 +345,9 @@ struct ServiceLayerConfigSet : public StorageConfigSet
StorBucketInitConfigBuilder stor_bucket_init;
StorVisitorConfigBuilder stor_visitor;
- ServiceLayerConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ ServiceLayerConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const IBmDistribution& distribution, const DocumenttypesConfig& documenttypes_in,
int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
- : StorageConfigSet(base_dir, node_idx, false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
+ : StorageConfigSet(base_dir, node_idx, false, config_id_in, distribution, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
persistence(),
stor_filestor(),
stor_bucket_init(),
@@ -395,9 +376,9 @@ struct DistributorConfigSet : public StorageConfigSet
StorDistributormanagerConfigBuilder stor_distributormanager;
StorVisitordispatcherConfigBuilder stor_visitordispatcher;
- DistributorConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ DistributorConfigSet(const vespalib::string& base_dir, uint32_t node_idx, const vespalib::string& config_id_in, const IBmDistribution& distribution, const DocumenttypesConfig& documenttypes_in,
int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
- : StorageConfigSet(base_dir, node_idx, true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
+ : StorageConfigSet(base_dir, node_idx, true, config_id_in, distribution, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
stor_distributormanager(),
stor_visitordispatcher()
{
@@ -451,12 +432,10 @@ class MyBmNode : public BmNode
MyResourceWriteFilter _write_filter;
proton::test::DiskMemUsageNotifier _disk_mem_usage_notifier;
std::shared_ptr<proton::PersistenceEngine> _persistence_engine;
- std::unique_ptr<const document::FieldSetRepo> _field_set_repo;
ServiceLayerConfigSet _service_layer_config;
DistributorConfigSet _distributor_config;
ConfigSet _config_set;
std::shared_ptr<config::IConfigContext> _config_context;
- std::unique_ptr<IBmFeedHandler> _feed_handler;
std::unique_ptr<mbus::Slobrok> _slobrok;
std::shared_ptr<BmStorageLinkContext> _service_layer_chain_context;
std::unique_ptr<MyServiceLayerProcess> _service_layer;
@@ -472,13 +451,11 @@ public:
void start_service_layer(const BmClusterParams& params) override;
void wait_service_layer() override;
void start_distributor(const BmClusterParams& params) override;
- void create_feed_handler(const BmClusterParams& params) override;
- void shutdown_feed_handler() override;
void shutdown_distributor() override;
void shutdown_service_layer() override;
void wait_service_layer_slobrok() override;
void wait_distributor_slobrok() override;
- IBmFeedHandler* get_feed_handler() override;
+ std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) override;
PersistenceProvider* get_persistence_provider() override;
};
@@ -514,12 +491,10 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod
_write_filter(),
_disk_mem_usage_notifier(),
_persistence_engine(),
- _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)),
- _service_layer_config(_base_dir, _node_idx, "bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
- _distributor_config(_base_dir, _node_idx, "bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
+ _service_layer_config(_base_dir, _node_idx, "bm-servicelayer", cluster.get_distribution(), *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
+ _distributor_config(_base_dir, _node_idx, "bm-distributor", cluster.get_distribution(), *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
_config_set(),
_config_context(std::make_shared<config::ConfigContext>(_config_set)),
- _feed_handler(),
_slobrok(),
_service_layer_chain_context(),
_service_layer(),
@@ -532,7 +507,6 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod
_persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy);
_service_layer_config.add_builders(_config_set);
_distributor_config.add_builders(_config_set);
- _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, params.get_skip_get_spi_bucket_info());
}
MyBmNode::~MyBmNode()
@@ -636,42 +610,6 @@ MyBmNode::start_distributor(const BmClusterParams& params)
}
void
-MyBmNode::create_feed_handler(const BmClusterParams& params)
-{
- StorageApiRpcService::Params rpc_params;
- // This is the same compression config as the default in stor-communicationmanager.def.
- rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
- rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node();
- if (params.get_use_document_api()) {
- _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(_cluster.get_message_bus());
- } else if (params.get_enable_distributor()) {
- if (params.get_use_storage_chain()) {
- assert(_distributor_chain_context);
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
- } else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(_cluster.get_message_bus(), true);
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(_cluster.get_rpc_client(), _repo, rpc_params, true);
- }
- } else if (params.needs_service_layer()) {
- if (params.get_use_storage_chain()) {
- assert(_service_layer_chain_context);
- _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
- } else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(_cluster.get_message_bus(), false);
- } else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(_cluster.get_rpc_client(), _repo, rpc_params, false);
- }
- }
-}
-
-void
-MyBmNode::shutdown_feed_handler()
-{
- _feed_handler.reset();
-}
-
-void
MyBmNode::shutdown_distributor()
{
if (_distributor) {
@@ -691,10 +629,10 @@ MyBmNode::shutdown_service_layer()
}
}
-IBmFeedHandler*
-MyBmNode::get_feed_handler()
+std::shared_ptr<BmStorageLinkContext>
+MyBmNode::get_storage_link_context(bool distributor)
{
- return _feed_handler.get();
+ return distributor ? _distributor_chain_context : _service_layer_chain_context;
}
PersistenceProvider*
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
index 2287ac90dec..4bb0d63404a 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
@@ -22,7 +22,9 @@ namespace search::bmcluster {
class BmCluster;
class BmClusterParams;
+struct BmStorageLinkContext;
class IBmFeedHandler;
+class IBMDistribution;
/*
* Class representing a single benchmark node in a benchmark cluster.
@@ -38,13 +40,11 @@ public:
virtual void start_service_layer(const BmClusterParams& params) = 0;
virtual void wait_service_layer() = 0;
virtual void start_distributor(const BmClusterParams& params) = 0;
- virtual void create_feed_handler(const BmClusterParams& params) = 0;
- virtual void shutdown_feed_handler() = 0;
virtual void shutdown_distributor() = 0;
virtual void shutdown_service_layer() = 0;
virtual void wait_service_layer_slobrok() = 0;
virtual void wait_distributor_slobrok() = 0;
- virtual IBmFeedHandler* get_feed_handler() = 0;
+ virtual std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) = 0;
virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0;
static unsigned int num_ports();
static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port);
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp
new file mode 100644
index 00000000000..45145a6f65c
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.cpp
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_storage_message_addresses.h"
+#include <vespa/storageapi/messageapi/storagemessage.h>
+
+using storage::api::StorageMessageAddress;
+using storage::lib::NodeType;
+
+namespace search::bmcluster {
+
+namespace {
+
+vespalib::string _Storage("storage");
+
+}
+
+BmStorageMessageAddresses::BmStorageMessageAddresses(uint32_t num_nodes, bool distributor)
+ : _addresses(num_nodes)
+{
+ for (uint32_t node_idx = 0; node_idx < num_nodes; ++node_idx) {
+ _addresses[node_idx] = std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx);
+ }
+}
+
+BmStorageMessageAddresses::~BmStorageMessageAddresses() = default;
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h
new file mode 100644
index 00000000000..93b0ed56168
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_storage_message_addresses.h
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+namespace storage::api { class StorageMessageAddress; }
+
+namespace search::bmcluster {
+
+/*
+ * Class representing the storage message addresses for a set of nodes at
+ * the given layer (service layer or ditributor).
+ */
+class BmStorageMessageAddresses
+{
+ using StorageMessageAddress = storage::api::StorageMessageAddress;
+ std::vector<std::unique_ptr<const StorageMessageAddress>> _addresses;
+public:
+ BmStorageMessageAddresses(uint32_t num_nodes, bool distributor);
+ ~BmStorageMessageAddresses();
+ const StorageMessageAddress &get_address(uint32_t node_idx) const { return *_addresses[node_idx]; }
+ bool has_address(uint32_t node_idx) const { return node_idx < _addresses.size(); }
+
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
index 6670707ed39..4de1f21b86f 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.cpp
@@ -5,10 +5,9 @@
namespace search::bmcluster {
-BucketInfoQueue::BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors)
+BucketInfoQueue::BucketInfoQueue(std::atomic<uint32_t>& errors)
: _mutex(),
- _buckets(),
- _provider(provider),
+ _pending_get_bucket_infos(),
_errors(errors)
{
}
@@ -22,11 +21,11 @@ void
BucketInfoQueue::get_bucket_info_loop()
{
std::unique_lock guard(_mutex);
- while (!_buckets.empty()) {
- auto bucket = _buckets.front();
- _buckets.pop_front();
+ while (!_pending_get_bucket_infos.empty()) {
+ auto pending_get_bucket_info = _pending_get_bucket_infos.front();
+ _pending_get_bucket_infos.pop_front();
guard.unlock();
- auto bucket_info = _provider.getBucketInfo(bucket);
+ auto bucket_info = pending_get_bucket_info.second->getBucketInfo(pending_get_bucket_info.first);
if (bucket_info.hasError()) {
++_errors;
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h
index 1a48f9fa478..0cc66a54fed 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bucket_info_queue.h
@@ -18,19 +18,19 @@ namespace search::bmcluster {
*/
class BucketInfoQueue
{
+ using PendingGetBucketInfo = std::pair<storage::spi::Bucket, storage::spi::PersistenceProvider*>;
std::mutex _mutex;
- std::deque<storage::spi::Bucket> _buckets;
- storage::spi::PersistenceProvider& _provider;
+ std::deque<PendingGetBucketInfo> _pending_get_bucket_infos;
std::atomic<uint32_t>& _errors;
public:
- BucketInfoQueue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors);
+ BucketInfoQueue(std::atomic<uint32_t>& errors);
~BucketInfoQueue();
- void put_bucket(storage::spi::Bucket bucket)
+ void put_bucket(storage::spi::Bucket bucket, storage::spi::PersistenceProvider* provider)
{
std::lock_guard guard(_mutex);
- _buckets.emplace_back(std::move(bucket));
+ _pending_get_bucket_infos.emplace_back(std::make_pair(std::move(bucket), provider));
}
void get_bucket_info_loop();
diff --git a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp
index c6f2626f27c..8e5c65ac5b6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.cpp
@@ -2,6 +2,7 @@
#include "document_api_message_bus_bm_feed_handler.h"
#include "bm_message_bus.h"
+#include "i_bm_distribution.h"
#include "pending_tracker.h"
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
@@ -19,53 +20,56 @@ using storage::lib::NodeType;
namespace search::bmcluster {
-namespace {
- vespalib::string _Storage("storage");
-}
-
-DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus)
+DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution)
: IBmFeedHandler(),
_name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")),
- _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, NodeType::DISTRIBUTOR, 0)),
_message_bus(message_bus),
- _route(_storage_address->to_mbus_route())
+ _routes(distribution.get_num_nodes(), true),
+ _no_route_error_count(0u),
+ _distribution(distribution)
{
}
DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default;
void
-DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker)
+DocumentApiMessageBusBmFeedHandler::send_msg(const document::Bucket& bucket, std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker)
{
- _message_bus.send_msg(std::move(msg), _route, pending_tracker);
+ uint32_t node_idx = _distribution.get_distributor_node_idx(bucket);
+ if (_routes.has_route(node_idx)) {
+ auto &route = _routes.get_route(node_idx);
+ _message_bus.send_msg(std::move(msg), route, pending_tracker);
+ } else {
+ ++_no_route_error_count;
+ }
}
void
-DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker)
+DocumentApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker)
{
auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document));
- send_msg(std::move(msg), tracker);
+ send_msg(bucket, std::move(msg), tracker);
}
void
-DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker)
+DocumentApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker)
{
auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update));
- send_msg(std::move(msg), tracker);
+ send_msg(bucket, std::move(msg), tracker);
}
void
-DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker)
+DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t, PendingTracker& tracker)
{
auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id);
- send_msg(std::move(msg), tracker);
+ send_msg(bucket, std::move(msg), tracker);
}
void
-DocumentApiMessageBusBmFeedHandler::get(const document::Bucket&, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
+DocumentApiMessageBusBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
{
auto msg = std::make_unique<documentapi::GetDocumentMessage>(document_id, field_set_string);
- send_msg(std::move(msg), tracker);
+ send_msg(bucket, std::move(msg), tracker);
}
void
@@ -76,7 +80,7 @@ DocumentApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
uint32_t
DocumentApiMessageBusBmFeedHandler::get_error_count() const
{
- return _message_bus.get_error_count();
+ return _message_bus.get_error_count() + _no_route_error_count;
}
const vespalib::string&
diff --git a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h
index 5358e0a948b..907c18a309a 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/document_api_message_bus_bm_feed_handler.h
@@ -3,15 +3,15 @@
#pragma once
#include "i_bm_feed_handler.h"
-#include <vespa/messagebus/routing/route.h>
+#include "bm_message_bus_routes.h"
+#include <atomic>
-namespace document { class DocumentTypeRepo; }
namespace documentapi { class DocumentMessage; };
-namespace storage::api { class StorageMessageAddress; }
namespace search::bmcluster {
class BmMessageBus;
+class IBmDistribution;
/*
* Benchmark feed handler for feed to distributor using document api protocol
@@ -19,13 +19,14 @@ class BmMessageBus;
*/
class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler
{
- vespalib::string _name;
- std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
- BmMessageBus& _message_bus;
- mbus::Route _route;
- void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker);
+ vespalib::string _name;
+ BmMessageBus& _message_bus;
+ BmMessageBusRoutes _routes;
+ std::atomic<uint32_t> _no_route_error_count;
+ const IBmDistribution& _distribution;
+ void send_msg(const document::Bucket& bucket, std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker);
public:
- DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus);
+ DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution);
~DocumentApiMessageBusBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h b/searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h
new file mode 100644
index 00000000000..f2c18767d1d
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/i_bm_distribution.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+
+namespace document { class Bucket; }
+
+namespace vespa::config::content::internal {
+ class InternalStorDistributionType;
+}
+
+namespace storage::lib { class ClusterStateBundle; }
+
+namespace search::bmcluster {
+
+/*
+ * Interface class for describing cluster toplogy and how messages are
+ * routed from feeders into the cluster.
+ */
+class IBmDistribution {
+public:
+ using DistributionConfigBuilder = vespa::config::content::internal::InternalStorDistributionType;
+ using DistributionConfig = const DistributionConfigBuilder;
+
+ virtual ~IBmDistribution() = default;
+ virtual uint32_t get_num_nodes() const = 0;
+ virtual uint32_t get_service_layer_node_idx(const document::Bucket & bucket) const = 0;
+ virtual uint32_t get_distributor_node_idx(const document::Bucket & bucket) const = 0;
+ virtual DistributionConfig get_distribution_config() const = 0;
+ virtual storage::lib::ClusterStateBundle get_cluster_state_bundle() const = 0;
+};
+
+};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp
index 247bf8bece3..bb19bcaefdf 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.cpp
@@ -47,9 +47,9 @@ PendingTracker::drain()
}
void
-PendingTracker::attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors)
+PendingTracker::attach_bucket_info_queue(std::atomic<uint32_t>& errors)
{
- _bucket_info_queue = std::make_unique<BucketInfoQueue>(provider, errors);
+ _bucket_info_queue = std::make_unique<BucketInfoQueue>(errors);
}
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h
index a8fa2f77396..cb8dc823b9b 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/pending_tracker.h
@@ -5,8 +5,6 @@
#include <atomic>
#include <memory>
-namespace storage::spi { struct PersistenceProvider; }
-
namespace search::bmcluster {
class BucketInfoQueue;
@@ -30,7 +28,7 @@ public:
void retain();
void drain();
- void attach_bucket_info_queue(storage::spi::PersistenceProvider& provider, std::atomic<uint32_t>& errors);
+ void attach_bucket_info_queue(std::atomic<uint32_t>& errors);
BucketInfoQueue *get_bucket_info_queue() { return _bucket_info_queue.get(); }
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
index e905b493cf4..e0822db6f0c 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "spi_bm_feed_handler.h"
+#include "i_bm_distribution.h"
#include "pending_tracker.h"
#include "bucket_info_queue.h"
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -31,18 +32,20 @@ void get_bucket_info_loop(PendingTracker &tracker)
class MyOperationComplete : public storage::spi::OperationComplete
{
+ PersistenceProvider* _provider;
std::atomic<uint32_t> &_errors;
Bucket _bucket;
PendingTracker& _tracker;
public:
- MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker);
+ MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker);
~MyOperationComplete() override;
void onComplete(std::unique_ptr<storage::spi::Result> result) override;
void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
};
-MyOperationComplete::MyOperationComplete(std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker)
- : _errors(errors),
+MyOperationComplete::MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker)
+ : _provider(provider),
+ _errors(errors),
_bucket(bucket),
_tracker(tracker)
{
@@ -62,7 +65,7 @@ MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
} else {
auto bucket_info_queue = _tracker.get_bucket_info_queue();
if (bucket_info_queue != nullptr) {
- bucket_info_queue->put_bucket(_bucket);
+ bucket_info_queue->put_bucket(_bucket, _provider);
}
}
}
@@ -75,50 +78,81 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result
}
-SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider, const document::FieldSetRepo &field_set_repo, bool skip_get_spi_bucket_info)
+SpiBmFeedHandler::SpiBmFeedHandler(std::vector<PersistenceProvider* >providers, const document::FieldSetRepo &field_set_repo, const IBmDistribution& distribution, bool skip_get_spi_bucket_info)
: IBmFeedHandler(),
_name(vespalib::string("SpiBmFeedHandler(") + (skip_get_spi_bucket_info ? "skip-get-spi-bucket-info" : "get-spi-bucket-info") + ")"),
- _provider(provider),
+ _providers(std::move(providers)),
_field_set_repo(field_set_repo),
_errors(0u),
- _skip_get_spi_bucket_info(skip_get_spi_bucket_info)
+ _skip_get_spi_bucket_info(skip_get_spi_bucket_info),
+ _distribution(distribution)
{
}
SpiBmFeedHandler::~SpiBmFeedHandler() = default;
+PersistenceProvider*
+SpiBmFeedHandler::get_provider(const document::Bucket& bucket)
+{
+ uint32_t node_idx = _distribution.get_service_layer_node_idx(bucket);
+ if (node_idx >= _providers.size()) {
+ return nullptr;
+ }
+ return _providers[node_idx];
+}
+
void
SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ provider->putAsync(spi_bucket, Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ } else {
+ ++_errors;
+ }
}
void
SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ provider->updateAsync(spi_bucket, Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ } else {
+ ++_errors;
+ }
}
void
SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- _provider.removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(_errors, spi_bucket, tracker));
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ provider->removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ } else {
+ ++_errors;
+ }
}
void
SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
{
get_bucket_info_loop(tracker);
- Bucket spi_bucket(bucket);
- auto field_set = _field_set_repo.getFieldSet(field_set_string);
- auto result = _provider.get(spi_bucket, *field_set, document_id, context);
- if (result.hasError()) {
+ auto provider = get_provider(bucket);
+ if (provider) {
+ Bucket spi_bucket(bucket);
+ auto field_set = _field_set_repo.getFieldSet(field_set_string);
+ auto result = provider->get(spi_bucket, *field_set, document_id, context);
+ if (result.hasError()) {
+ ++_errors;
+ }
+ } else {
++_errors;
}
}
@@ -127,7 +161,7 @@ void
SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker)
{
if (!_skip_get_spi_bucket_info) {
- tracker.attach_bucket_info_queue(_provider, _errors);
+ tracker.attach_bucket_info_queue(_errors);
}
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
index bbc9e3b8e74..07ef0a21e5b 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
@@ -4,24 +4,30 @@
#include "i_bm_feed_handler.h"
#include <atomic>
+#include <vector>
namespace document { class FieldSetRepo; }
namespace storage::spi { struct PersistenceProvider; }
namespace search::bmcluster {
+class IBmDistribution;
+
/*
* Benchmark feed handler for feed directly to persistence provider
*/
class SpiBmFeedHandler : public IBmFeedHandler
{
- vespalib::string _name;
- storage::spi::PersistenceProvider& _provider;
- const document::FieldSetRepo& _field_set_repo;
- std::atomic<uint32_t> _errors;
- bool _skip_get_spi_bucket_info;
+ vespalib::string _name;
+ std::vector<storage::spi::PersistenceProvider*> _providers;
+ const document::FieldSetRepo& _field_set_repo;
+ std::atomic<uint32_t> _errors;
+ bool _skip_get_spi_bucket_info;
+ const IBmDistribution& _distribution;
+
+ storage::spi::PersistenceProvider* get_provider(const document::Bucket &bucket);
public:
- SpiBmFeedHandler(storage::spi::PersistenceProvider& provider, const document::FieldSetRepo& field_set_repo, bool skip_get_spi_bucket_info);
+ SpiBmFeedHandler(std::vector<storage::spi::PersistenceProvider*> providers, const document::FieldSetRepo& field_set_repo, const IBmDistribution& distribution, bool skip_get_spi_bucket_info);
~SpiBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
index 34669b8cbdc..f7df593a75e 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storage_api_chain_bm_feed_handler.h"
+#include "i_bm_distribution.h"
#include "pending_tracker.h"
#include "storage_reply_error_checker.h"
#include "bm_storage_link_context.h"
@@ -20,27 +21,14 @@ using document::DocumentUpdate;
namespace search::bmcluster {
-namespace {
-
-std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
- auto cmd = std::make_shared<storage::api::SetSystemStateCommand>(bundle);
- cmd->setPriority(storage::api::StorageMessage::VERYHIGH);
- return cmd;
-}
-
-}
-
-StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor)
+StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::vector<std::shared_ptr<BmStorageLinkContext>> contexts, const IBmDistribution& distribution, bool distributor)
: IBmFeedHandler(),
_name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
- _context(std::move(context))
+ _contexts(std::move(contexts)),
+ _no_link_error_count(0u),
+ _distribution(distribution)
{
- auto cmd = make_set_cluster_state_cmd();
- PendingTracker tracker(1);
- send_msg(std::move(cmd), tracker);
- tracker.drain();
}
StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default;
@@ -48,10 +36,20 @@ StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default;
void
StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
{
- cmd->setSourceIndex(0);
- auto bm_link = _context->bm_link;
- bm_link->retain(cmd->getMsgId(), pending_tracker);
- bm_link->sendDown(std::move(cmd));
+ auto bucket = cmd->getBucket();
+ if (_distributor) {
+ cmd->setSourceIndex(0);
+ } else {
+ cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket));
+ }
+ uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+ if (node_idx < _contexts.size() && _contexts[node_idx]) {
+ auto bm_link = _contexts[node_idx]->bm_link;
+ bm_link->retain(cmd->getMsgId(), pending_tracker);
+ bm_link->sendDown(std::move(cmd));
+ } else {
+ ++_no_link_error_count;
+ }
}
void
@@ -90,7 +88,14 @@ StorageApiChainBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
uint32_t
StorageApiChainBmFeedHandler::get_error_count() const
{
- return _context->bm_link->get_error_count();
+ uint32_t error_count = 0;
+ for (auto &context : _contexts) {
+ if (context) {
+ error_count += context->bm_link->get_error_count();
+ }
+ }
+ error_count += _no_link_error_count;
+ return error_count;
}
const vespalib::string&
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
index 1c196d746eb..d4ba06403e3 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
@@ -3,12 +3,15 @@
#pragma once
#include "i_bm_feed_handler.h"
+#include <vector>
+#include <atomic>
namespace storage::api { class StorageCommand; }
namespace search::bmcluster {
struct BmStorageLinkContext;
+class IBmDistribution;
/*
* Benchmark feed handler for feed to service layer or distributor
@@ -17,12 +20,15 @@ struct BmStorageLinkContext;
class StorageApiChainBmFeedHandler : public IBmFeedHandler
{
private:
- vespalib::string _name;
- bool _distributor;
- std::shared_ptr<BmStorageLinkContext> _context;
+ vespalib::string _name;
+ bool _distributor;
+ std::vector<std::shared_ptr<BmStorageLinkContext>> _contexts;
+ std::atomic<uint32_t> _no_link_error_count;
+ const IBmDistribution& _distribution;
+
void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
public:
- StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor);
+ StorageApiChainBmFeedHandler(std::vector<std::shared_ptr<BmStorageLinkContext>> contexts, const IBmDistribution& distribution, bool distributor);
~StorageApiChainBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
index 04561b5d93e..4f84be508db 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
@@ -2,6 +2,7 @@
#include "storage_api_message_bus_bm_feed_handler.h"
#include "bm_message_bus.h"
+#include "i_bm_distribution.h"
#include "pending_tracker.h"
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
@@ -17,16 +18,14 @@ using storage::lib::NodeType;
namespace search::bmcluster {
-namespace {
- vespalib::string _Storage("storage");
-}
-StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor)
+StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor)
: IBmFeedHandler(),
_name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
- _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
_message_bus(message_bus),
- _route(_storage_address->to_mbus_route())
+ _routes(distribution.get_num_nodes(), distributor),
+ _no_route_error_count(0),
+ _distribution(distribution)
{
}
@@ -35,9 +34,20 @@ StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = defaul
void
StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
{
- cmd->setSourceIndex(0);
+ auto bucket = cmd->getBucket();
+ if (_distributor) {
+ cmd->setSourceIndex(0);
+ } else {
+ cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket));
+ }
auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
- _message_bus.send_msg(std::move(msg), _route, pending_tracker);
+ uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+ if (_routes.has_route(node_idx)) {
+ auto& route = _routes.get_route(node_idx);
+ _message_bus.send_msg(std::move(msg), route, pending_tracker);
+ } else {
+ ++_no_route_error_count;
+ }
}
void
@@ -76,7 +86,7 @@ StorageApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
uint32_t
StorageApiMessageBusBmFeedHandler::get_error_count() const
{
- return _message_bus.get_error_count();
+ return _message_bus.get_error_count() + _no_route_error_count;
}
const vespalib::string&
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
index 0027f260b8f..c02e594964a 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
@@ -3,18 +3,15 @@
#pragma once
#include "i_bm_feed_handler.h"
-#include <vespa/messagebus/routing/route.h>
+#include "bm_message_bus_routes.h"
+#include <atomic>
-namespace document { class DocumentTypeRepo; }
-namespace documentapi { class DocumentMessage; };
-namespace storage::api {
-class StorageCommand;
-class StorageMessageAddress;
-}
+namespace storage::api { class StorageCommand; }
namespace search::bmcluster {
class BmMessageBus;
+class IBmDistribution;
/*
* Benchmark feed handler for feed to service layer or distributor
@@ -22,14 +19,15 @@ class BmMessageBus;
*/
class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler
{
- vespalib::string _name;
- bool _distributor;
- std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
- BmMessageBus& _message_bus;
- mbus::Route _route;
+ vespalib::string _name;
+ bool _distributor;
+ BmMessageBus& _message_bus;
+ BmMessageBusRoutes _routes;
+ std::atomic<uint32_t> _no_route_error_count;
+ const IBmDistribution& _distribution;
void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
public:
- StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor);
+ StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor);
~StorageApiMessageBusBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
index 3e0426cb308..57d50853c71 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storage_api_rpc_bm_feed_handler.h"
+#include "i_bm_distribution.h"
#include "pending_tracker.h"
#include "pending_tracker_hash.h"
#include "storage_reply_error_checker.h"
@@ -24,10 +25,6 @@ using storage::lib::NodeType;
namespace search::bmcluster {
-namespace {
- vespalib::string _Storage("storage");
-}
-
class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher,
public StorageReplyErrorChecker
{
@@ -66,15 +63,18 @@ StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in,
std::shared_ptr<const DocumentTypeRepo> repo,
const StorageApiRpcService::Params& rpc_params,
+ const IBmDistribution& distribution,
bool distributor)
: IBmFeedHandler(),
_name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
- _storage_address(std::make_unique<StorageMessageAddress>(&_Storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
+ _addresses(distribution.get_num_nodes(), distributor),
+ _no_address_error_count(0u),
_shared_rpc_resources(shared_rpc_resources_in),
_message_dispatcher(std::make_unique<MyMessageDispatcher>()),
_message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo)),
- _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params))
+ _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)),
+ _distribution(distribution)
{
}
@@ -83,10 +83,20 @@ StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default;
void
StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
{
- cmd->setSourceIndex(0);
- cmd->setAddress(*_storage_address);
- _message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
- _rpc_client->send_rpc_v1_request(std::move(cmd));
+ auto bucket = cmd->getBucket();
+ if (_distributor) {
+ cmd->setSourceIndex(0);
+ } else {
+ cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket));
+ }
+ uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+ if (_addresses.has_address(node_idx)) {
+ cmd->setAddress(_addresses.get_address(node_idx));
+ _message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
+ _rpc_client->send_rpc_v1_request(std::move(cmd));
+ } else {
+ ++_no_address_error_count;
+ }
}
void
@@ -125,7 +135,7 @@ StorageApiRpcBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
uint32_t
StorageApiRpcBmFeedHandler::get_error_count() const
{
- return _message_dispatcher->get_error_count();
+ return _message_dispatcher->get_error_count() + _no_address_error_count;
}
const vespalib::string&
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
index 360f702e590..48b6233d0a4 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
@@ -3,6 +3,7 @@
#pragma once
#include "i_bm_feed_handler.h"
+#include "bm_storage_message_addresses.h"
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
namespace document { class DocumentTypeRepo; }
@@ -18,6 +19,8 @@ class SharedRpcResources;
namespace search::bmcluster {
+class IBmDistribution;
+
/*
* Benchmark feed handler for feed to service layer or distributor
* using storage api protocol over rpc.
@@ -27,17 +30,20 @@ class StorageApiRpcBmFeedHandler : public IBmFeedHandler
class MyMessageDispatcher;
vespalib::string _name;
bool _distributor;
- std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
- storage::rpc::SharedRpcResources& _shared_rpc_resources;
- std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
+ BmStorageMessageAddresses _addresses;
+ std::atomic<uint32_t> _no_address_error_count;
+ storage::rpc::SharedRpcResources& _shared_rpc_resources;
+ std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider;
std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client;
+ const IBmDistribution& _distribution;
void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
public:
StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in,
std::shared_ptr<const document::DocumentTypeRepo> repo,
const storage::rpc::StorageApiRpcService::Params& rpc_params,
+ const IBmDistribution& distribution,
bool distributor);
~StorageApiRpcBmFeedHandler();
void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h
index 2fcb6aad14a..62a7190b14a 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_reply_error_checker.h
@@ -8,6 +8,9 @@ namespace storage::api { class StorageMessage; }
namespace search::bmcluster {
+/*
+ * Class tracking errors in storage replies.
+ */
class StorageReplyErrorChecker {
protected:
std::atomic<uint32_t> _errors;