diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-09-16 15:07:25 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-09-16 15:07:25 +0200 |
commit | 02021ad79090f7ffa9f8562971d5e41d397dfbe4 (patch) | |
tree | da324b54e738fa23188af966787530944ee4fadb | |
parent | b9b2d10776ded0d34a2f0b944d3edba972114b5c (diff) |
Increase lifetime of BmClusterController.
8 files changed, 58 insertions, 36 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp index b7886a08325..adb7f2dd12b 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp @@ -149,6 +149,7 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl _field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)), _distribution(std::make_shared<const BmDistribution>(params.get_num_nodes())), _nodes(params.get_num_nodes()), + _cluster_controller(std::make_shared<BmClusterController>(*this, *_distribution)), _feed_handler() { _message_bus_config->add_builders(*_config_set); @@ -307,14 +308,7 @@ BmCluster::start_service_layers() node->wait_service_layer_slobrok(); } } - BmClusterController fake_controller(get_rpc_client(), *_distribution); - uint32_t node_idx = 0; - for (const auto &node : _nodes) { - if (node) { - fake_controller.set_cluster_up(node_idx, false); - } - ++node_idx; - } + _cluster_controller->propagate_cluster_state(false); } void @@ -330,14 +324,7 @@ BmCluster::start_distributors() node->wait_distributor_slobrok(); } } - BmClusterController fake_controller(get_rpc_client(), *_distribution); - uint32_t node_idx = 0; - for (const auto &node : _nodes) { - if (node) { - fake_controller.set_cluster_up(node_idx, true); - } - ++node_idx; - } + _cluster_controller->propagate_cluster_state(true); // Wait for bucket ownership transfer safe time std::this_thread::sleep_for(2s); } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h index db23cd986e0..e8f72ec8254 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h @@ -25,6 +25,7 @@ namespace storage::rpc { class SharedRpcResources; } namespace search::bmcluster { +class BmClusterController; class BmFeed; class BmMessageBus; class BmNode; @@ -55,6 +56,7 @@ class BmCluster { std::unique_ptr<const document::FieldSetRepo> _field_set_repo; std::shared_ptr<const IBmDistribution> _distribution; std::vector<std::unique_ptr<BmNode>> _nodes; + std::shared_ptr<BmClusterController> _cluster_controller; std::unique_ptr<IBmFeedHandler> _feed_handler; public: @@ -77,12 +79,15 @@ public: void initialize_providers(); void start(BmFeed &feed); void stop(); + const storage::rpc::SharedRpcResources &get_rpc_client() const { return *_rpc_client; } storage::rpc::SharedRpcResources &get_rpc_client() { return *_rpc_client; } BmMessageBus& get_message_bus() { return *_message_bus; } const IBmDistribution& get_distribution() { return *_distribution; } void make_node(uint32_t node_idx); void make_nodes(); IBmFeedHandler* get_feed_handler(); + uint32_t get_num_nodes() const { return _nodes.size(); } + BmNode *get_node(uint32_t node_idx) const { return node_idx < _nodes.size() ? _nodes[node_idx].get() : nullptr; } }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp index 38ef1b45254..60811e10fe6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp @@ -1,6 +1,8 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bm_cluster_controller.h" +#include "bm_cluster.h" +#include "bm_node.h" #include "i_bm_distribution.h" #include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> @@ -35,20 +37,21 @@ make_set_cluster_state_request(const IBmDistribution& distribution) } -BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, const IBmDistribution &distribution) - : _shared_rpc_resources(shared_rpc_resources_in), +BmClusterController::BmClusterController(BmCluster& cluster, const IBmDistribution &distribution) + : _cluster(cluster), _distribution(distribution) { } void -BmClusterController::set_cluster_up(uint32_t node_idx, bool distributor) +BmClusterController::propagate_cluster_state(uint32_t node_idx, bool distributor) { static vespalib::string _storage("storage"); StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx); auto req = make_set_cluster_state_request(_distribution); - auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(), - _shared_rpc_resources.target_factory(), 1); + auto& shared_rpc_resources = _cluster.get_rpc_client(); + auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(), + shared_rpc_resources.target_factory(), 1); uint64_t fake_bucket_id = 0; auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id); target->get()->InvokeSync(req, 10.0); // 10 seconds timeout @@ -56,4 +59,23 @@ BmClusterController::set_cluster_up(uint32_t node_idx, bool distributor) req->SubRef(); } +void +BmClusterController::propagate_cluster_state(bool distributor) +{ + uint32_t num_nodes = _cluster.get_num_nodes(); + for (uint32_t node_idx = 0; node_idx < num_nodes; ++node_idx) { + auto node = _cluster.get_node(node_idx); + if (node && node->has_storage_layer(distributor)) { + propagate_cluster_state(node_idx, distributor); + } + } +} + +void +BmClusterController::propagate_cluster_state() +{ + propagate_cluster_state(false); + propagate_cluster_state(true); +} + } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h index 3e239787b02..b56ee2f5627 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h @@ -4,11 +4,9 @@ #include <cstdint> -namespace storage::api { class StorageMessageAddress; } -namespace storage::rpc { class SharedRpcResources; } - namespace search::bmcluster { +class BmCluster; class IBmDistribution; /* @@ -16,11 +14,13 @@ class IBmDistribution; */ class BmClusterController { - storage::rpc::SharedRpcResources& _shared_rpc_resources; + const BmCluster& _cluster; const IBmDistribution& _distribution; public: - BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, const IBmDistribution& distribution); - void set_cluster_up(uint32_t node_idx, bool distributor); + BmClusterController(BmCluster& cluster, const IBmDistribution& distribution); + void propagate_cluster_state(uint32_t node_idx, bool distributor); + void propagate_cluster_state(bool distributor); + void propagate_cluster_state(); }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp index 8ce5d34b4bc..55160f5d561 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp @@ -36,15 +36,15 @@ BmFeeder::BmFeeder(std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler& _bucket_space(document::test::makeBucketSpace("test")), _feed_handler(feed_handler), _executor(executor), - _all_fields(document::AllFields::NAME) - + _all_fields(document::AllFields::NAME), + _use_timestamp(!_feed_handler.manages_timestamp()) { } BmFeeder::~BmFeeder() = default; void -BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, bool use_timestamp, PendingTracker& tracker) +BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, PendingTracker& tracker) { document::BucketId bucket_id; BmFeedOperation feed_op; @@ -57,7 +57,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, serialized_feed >> bucket_id; document::Bucket bucket(_bucket_space, bucket_id); auto document = std::make_unique<Document>(*_repo, serialized_feed); - _feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + op_idx) : 0), tracker); + _feed_handler.put(bucket, std::move(document), (_use_timestamp ? (time_bias + op_idx) : 0), tracker); } break; case BmFeedOperation::UPDATE_OPERATION: @@ -65,7 +65,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, serialized_feed >> bucket_id; document::Bucket bucket(_bucket_space, bucket_id); auto document_update = DocumentUpdate::createHEAD(*_repo, serialized_feed); - _feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + op_idx) : 0), tracker); + _feed_handler.update(bucket, std::move(document_update), (_use_timestamp ? (time_bias + op_idx) : 0), tracker); } break; case BmFeedOperation::GET_OPERATION: @@ -81,7 +81,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, serialized_feed >> bucket_id; document::Bucket bucket(_bucket_space, bucket_id); DocumentId document_id(serialized_feed); - _feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + op_idx) : 0), tracker); + _feed_handler.remove(bucket, document_id, (_use_timestamp ? (time_bias + op_idx) : 0), tracker); } break; default: @@ -99,9 +99,8 @@ BmFeeder::feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostre PendingTracker pending_tracker(max_pending); _feed_handler.attach_bucket_info_queue(pending_tracker); vespalib::nbostream is(serialized_feed.data(), serialized_feed.size()); - bool use_timestamp = !_feed_handler.manages_timestamp(); for (uint32_t i = range.get_start(); i < range.get_end(); ++i) { - feed_operation(i, is, time_bias, use_timestamp, pending_tracker); + feed_operation(i, is, time_bias, pending_tracker); } assert(is.empty()); pending_tracker.drain(); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h index d6391c30b00..1001f572bf2 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h @@ -35,10 +35,11 @@ class BmFeeder { IBmFeedHandler& _feed_handler; vespalib::ThreadStackExecutor& _executor; vespalib::string _all_fields; + bool _use_timestamp; public: BmFeeder(std::shared_ptr<const document::DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor); ~BmFeeder(); - void feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, bool use_timestamp, PendingTracker& tracker); + void feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, PendingTracker& tracker); void feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias); void run_feed_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler, const vespalib::string& op_name); IBmFeedHandler& get_feed_handler() const { return _feed_handler; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 0591338a086..7a297e82655 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -456,6 +456,7 @@ public: void wait_service_layer_slobrok() override; void wait_distributor_slobrok() override; std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) override; + bool has_storage_layer(bool distributor) const override; PersistenceProvider* get_persistence_provider() override; }; @@ -635,6 +636,12 @@ MyBmNode::get_storage_link_context(bool distributor) return distributor ? _distributor_chain_context : _service_layer_chain_context; } +bool +MyBmNode::has_storage_layer(bool distributor) const +{ + return distributor ? static_cast<bool>(_distributor) : static_cast<bool>(_service_layer); +} + PersistenceProvider* MyBmNode::get_persistence_provider() { diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h index 4bb0d63404a..31eb6fc15ff 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h @@ -45,6 +45,7 @@ public: virtual void wait_service_layer_slobrok() = 0; virtual void wait_distributor_slobrok() = 0; virtual std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) = 0; + virtual bool has_storage_layer(bool distributor) const = 0; virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0; static unsigned int num_ports(); static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port); |