aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-16 15:07:25 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-16 15:07:25 +0200
commit02021ad79090f7ffa9f8562971d5e41d397dfbe4 (patch)
treeda324b54e738fa23188af966787530944ee4fadb
parentb9b2d10776ded0d34a2f0b944d3edba972114b5c (diff)
Increase lifetime of BmClusterController.
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h5
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.h1
8 files changed, 58 insertions, 36 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
index b7886a08325..adb7f2dd12b 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
@@ -149,6 +149,7 @@ BmCluster::BmCluster(const vespalib::string& base_dir, int base_port, const BmCl
_field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)),
_distribution(std::make_shared<const BmDistribution>(params.get_num_nodes())),
_nodes(params.get_num_nodes()),
+ _cluster_controller(std::make_shared<BmClusterController>(*this, *_distribution)),
_feed_handler()
{
_message_bus_config->add_builders(*_config_set);
@@ -307,14 +308,7 @@ BmCluster::start_service_layers()
node->wait_service_layer_slobrok();
}
}
- BmClusterController fake_controller(get_rpc_client(), *_distribution);
- uint32_t node_idx = 0;
- for (const auto &node : _nodes) {
- if (node) {
- fake_controller.set_cluster_up(node_idx, false);
- }
- ++node_idx;
- }
+ _cluster_controller->propagate_cluster_state(false);
}
void
@@ -330,14 +324,7 @@ BmCluster::start_distributors()
node->wait_distributor_slobrok();
}
}
- BmClusterController fake_controller(get_rpc_client(), *_distribution);
- uint32_t node_idx = 0;
- for (const auto &node : _nodes) {
- if (node) {
- fake_controller.set_cluster_up(node_idx, true);
- }
- ++node_idx;
- }
+ _cluster_controller->propagate_cluster_state(true);
// Wait for bucket ownership transfer safe time
std::this_thread::sleep_for(2s);
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
index db23cd986e0..e8f72ec8254 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.h
@@ -25,6 +25,7 @@ namespace storage::rpc { class SharedRpcResources; }
namespace search::bmcluster {
+class BmClusterController;
class BmFeed;
class BmMessageBus;
class BmNode;
@@ -55,6 +56,7 @@ class BmCluster {
std::unique_ptr<const document::FieldSetRepo> _field_set_repo;
std::shared_ptr<const IBmDistribution> _distribution;
std::vector<std::unique_ptr<BmNode>> _nodes;
+ std::shared_ptr<BmClusterController> _cluster_controller;
std::unique_ptr<IBmFeedHandler> _feed_handler;
public:
@@ -77,12 +79,15 @@ public:
void initialize_providers();
void start(BmFeed &feed);
void stop();
+ const storage::rpc::SharedRpcResources &get_rpc_client() const { return *_rpc_client; }
storage::rpc::SharedRpcResources &get_rpc_client() { return *_rpc_client; }
BmMessageBus& get_message_bus() { return *_message_bus; }
const IBmDistribution& get_distribution() { return *_distribution; }
void make_node(uint32_t node_idx);
void make_nodes();
IBmFeedHandler* get_feed_handler();
+ uint32_t get_num_nodes() const { return _nodes.size(); }
+ BmNode *get_node(uint32_t node_idx) const { return node_idx < _nodes.size() ? _nodes[node_idx].get() : nullptr; }
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
index 38ef1b45254..60811e10fe6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
@@ -1,6 +1,8 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bm_cluster_controller.h"
+#include "bm_cluster.h"
+#include "bm_node.h"
#include "i_bm_distribution.h"
#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
@@ -35,20 +37,21 @@ make_set_cluster_state_request(const IBmDistribution& distribution)
}
-BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, const IBmDistribution &distribution)
- : _shared_rpc_resources(shared_rpc_resources_in),
+BmClusterController::BmClusterController(BmCluster& cluster, const IBmDistribution &distribution)
+ : _cluster(cluster),
_distribution(distribution)
{
}
void
-BmClusterController::set_cluster_up(uint32_t node_idx, bool distributor)
+BmClusterController::propagate_cluster_state(uint32_t node_idx, bool distributor)
{
static vespalib::string _storage("storage");
StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx);
auto req = make_set_cluster_state_request(_distribution);
- auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(),
- _shared_rpc_resources.target_factory(), 1);
+ auto& shared_rpc_resources = _cluster.get_rpc_client();
+ auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(shared_rpc_resources.slobrok_mirror(),
+ shared_rpc_resources.target_factory(), 1);
uint64_t fake_bucket_id = 0;
auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id);
target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
@@ -56,4 +59,23 @@ BmClusterController::set_cluster_up(uint32_t node_idx, bool distributor)
req->SubRef();
}
+void
+BmClusterController::propagate_cluster_state(bool distributor)
+{
+ uint32_t num_nodes = _cluster.get_num_nodes();
+ for (uint32_t node_idx = 0; node_idx < num_nodes; ++node_idx) {
+ auto node = _cluster.get_node(node_idx);
+ if (node && node->has_storage_layer(distributor)) {
+ propagate_cluster_state(node_idx, distributor);
+ }
+ }
+}
+
+void
+BmClusterController::propagate_cluster_state()
+{
+ propagate_cluster_state(false);
+ propagate_cluster_state(true);
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
index 3e239787b02..b56ee2f5627 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
@@ -4,11 +4,9 @@
#include <cstdint>
-namespace storage::api { class StorageMessageAddress; }
-namespace storage::rpc { class SharedRpcResources; }
-
namespace search::bmcluster {
+class BmCluster;
class IBmDistribution;
/*
@@ -16,11 +14,13 @@ class IBmDistribution;
*/
class BmClusterController
{
- storage::rpc::SharedRpcResources& _shared_rpc_resources;
+ const BmCluster& _cluster;
const IBmDistribution& _distribution;
public:
- BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, const IBmDistribution& distribution);
- void set_cluster_up(uint32_t node_idx, bool distributor);
+ BmClusterController(BmCluster& cluster, const IBmDistribution& distribution);
+ void propagate_cluster_state(uint32_t node_idx, bool distributor);
+ void propagate_cluster_state(bool distributor);
+ void propagate_cluster_state();
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
index 8ce5d34b4bc..55160f5d561 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.cpp
@@ -36,15 +36,15 @@ BmFeeder::BmFeeder(std::shared_ptr<const DocumentTypeRepo> repo, IBmFeedHandler&
_bucket_space(document::test::makeBucketSpace("test")),
_feed_handler(feed_handler),
_executor(executor),
- _all_fields(document::AllFields::NAME)
-
+ _all_fields(document::AllFields::NAME),
+ _use_timestamp(!_feed_handler.manages_timestamp())
{
}
BmFeeder::~BmFeeder() = default;
void
-BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, bool use_timestamp, PendingTracker& tracker)
+BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, PendingTracker& tracker)
{
document::BucketId bucket_id;
BmFeedOperation feed_op;
@@ -57,7 +57,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed,
serialized_feed >> bucket_id;
document::Bucket bucket(_bucket_space, bucket_id);
auto document = std::make_unique<Document>(*_repo, serialized_feed);
- _feed_handler.put(bucket, std::move(document), (use_timestamp ? (time_bias + op_idx) : 0), tracker);
+ _feed_handler.put(bucket, std::move(document), (_use_timestamp ? (time_bias + op_idx) : 0), tracker);
}
break;
case BmFeedOperation::UPDATE_OPERATION:
@@ -65,7 +65,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed,
serialized_feed >> bucket_id;
document::Bucket bucket(_bucket_space, bucket_id);
auto document_update = DocumentUpdate::createHEAD(*_repo, serialized_feed);
- _feed_handler.update(bucket, std::move(document_update), (use_timestamp ? (time_bias + op_idx) : 0), tracker);
+ _feed_handler.update(bucket, std::move(document_update), (_use_timestamp ? (time_bias + op_idx) : 0), tracker);
}
break;
case BmFeedOperation::GET_OPERATION:
@@ -81,7 +81,7 @@ BmFeeder::feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed,
serialized_feed >> bucket_id;
document::Bucket bucket(_bucket_space, bucket_id);
DocumentId document_id(serialized_feed);
- _feed_handler.remove(bucket, document_id, (use_timestamp ? (time_bias + op_idx) : 0), tracker);
+ _feed_handler.remove(bucket, document_id, (_use_timestamp ? (time_bias + op_idx) : 0), tracker);
}
break;
default:
@@ -99,9 +99,8 @@ BmFeeder::feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostre
PendingTracker pending_tracker(max_pending);
_feed_handler.attach_bucket_info_queue(pending_tracker);
vespalib::nbostream is(serialized_feed.data(), serialized_feed.size());
- bool use_timestamp = !_feed_handler.manages_timestamp();
for (uint32_t i = range.get_start(); i < range.get_end(); ++i) {
- feed_operation(i, is, time_bias, use_timestamp, pending_tracker);
+ feed_operation(i, is, time_bias, pending_tracker);
}
assert(is.empty());
pending_tracker.drain();
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
index d6391c30b00..1001f572bf2 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_feeder.h
@@ -35,10 +35,11 @@ class BmFeeder {
IBmFeedHandler& _feed_handler;
vespalib::ThreadStackExecutor& _executor;
vespalib::string _all_fields;
+ bool _use_timestamp;
public:
BmFeeder(std::shared_ptr<const document::DocumentTypeRepo> repo, IBmFeedHandler& feed_handler, vespalib::ThreadStackExecutor& executor);
~BmFeeder();
- void feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, bool use_timestamp, PendingTracker& tracker);
+ void feed_operation(uint32_t op_idx, vespalib::nbostream &serialized_feed, int64_t time_bias, PendingTracker& tracker);
void feed_task(uint32_t max_pending, BmRange range, const vespalib::nbostream &serialized_feed, int64_t time_bias);
void run_feed_tasks(int pass, int64_t& time_bias, const std::vector<vespalib::nbostream>& serialized_feed_v, const BmFeedParams& params, AvgSampler& sampler, const vespalib::string& op_name);
IBmFeedHandler& get_feed_handler() const { return _feed_handler; }
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 0591338a086..7a297e82655 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -456,6 +456,7 @@ public:
void wait_service_layer_slobrok() override;
void wait_distributor_slobrok() override;
std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) override;
+ bool has_storage_layer(bool distributor) const override;
PersistenceProvider* get_persistence_provider() override;
};
@@ -635,6 +636,12 @@ MyBmNode::get_storage_link_context(bool distributor)
return distributor ? _distributor_chain_context : _service_layer_chain_context;
}
+bool
+MyBmNode::has_storage_layer(bool distributor) const
+{
+ return distributor ? static_cast<bool>(_distributor) : static_cast<bool>(_service_layer);
+}
+
PersistenceProvider*
MyBmNode::get_persistence_provider()
{
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
index 4bb0d63404a..31eb6fc15ff 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
@@ -45,6 +45,7 @@ public:
virtual void wait_service_layer_slobrok() = 0;
virtual void wait_distributor_slobrok() = 0;
virtual std::shared_ptr<BmStorageLinkContext> get_storage_link_context(bool distributor) = 0;
+ virtual bool has_storage_layer(bool distributor) const = 0;
virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0;
static unsigned int num_ports();
static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, uint32_t node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port);