summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-11 00:51:16 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-11 00:51:16 +0200
commitaa49463fe12fd8e501bc923d8b9e225e63f9363b (patch)
tree7f5db802019ec8b921cb9fcb8087034a26c60cf3 /searchcore
parent302d4358bdb28ab1f2ebfff20f1cafb2d04f8835 (diff)
Enable multiple benchmarks nodes in benchmark cluster.
Only show first 10 errors in message bus.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h5
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h1
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp79
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.h14
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h1
10 files changed, 95 insertions, 70 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 94ae9c87ce0..940dcf35449 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -428,6 +428,7 @@ App::get_options()
{ "get-passes", 1, nullptr, 0 },
{ "indexing-sequencer", 1, nullptr, 0 },
{ "max-pending", 1, nullptr, 0 },
+ { "nodes", 1, nullptr, 0 },
{ "put-passes", 1, nullptr, 0 },
{ "remove-passes", 1, nullptr, 0 },
{ "response-threads", 1, nullptr, 0 },
@@ -452,6 +453,7 @@ App::get_options()
LONGOPT_GET_PASSES,
LONGOPT_INDEXING_SEQUENCER,
LONGOPT_MAX_PENDING,
+ LONGOPT_NODES,
LONGOPT_PUT_PASSES,
LONGOPT_REMOVE_PASSES,
LONGOPT_RESPONSE_THREADS,
@@ -499,6 +501,9 @@ App::get_options()
case LONGOPT_MAX_PENDING:
_bm_params.set_max_pending(atoi(opt_argument));
break;
+ case LONGOPT_NODES:
+ _bm_params.set_num_nodes(atoi(opt_argument));
+ break;
case LONGOPT_PUT_PASSES:
_bm_params.set_put_passes(atoi(opt_argument));
break;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
index 820b8a06ab5..58011d9c67a 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster.cpp
@@ -210,7 +210,7 @@ BmCluster::make_node(unsigned int node_idx)
s << _base_dir << "/n" << node_idx;
vespalib::string node_base_dir(s.str());
int node_base_port = port_number(_base_port, PortBias::NUM_PORTS) + BmNode::num_ports() * node_idx;
- _nodes[node_idx] = BmNode::create(node_base_dir, node_base_port, node_idx, _params, _document_types, _slobrok_port);
+ _nodes[node_idx] = BmNode::create(node_base_dir, node_base_port, node_idx, *this, _params, _document_types, _slobrok_port);
}
void
@@ -244,9 +244,9 @@ void
BmCluster::create_buckets(BmFeed& feed)
{
LOG(info, "create %u buckets", feed.num_buckets());
- auto feed_handler = get_node(0).make_create_bucket_feed_handler(false);
+ auto& node = get_node(0);
for (unsigned int i = 0; i < feed.num_buckets(); ++i) {
- feed_handler->create_bucket(feed.make_bucket(i));
+ node.create_bucket(feed.make_bucket(i));
}
}
@@ -267,11 +267,17 @@ BmCluster::start_service_layers()
start_rpc_client();
for (const auto &node : _nodes) {
if (node) {
- node->wait_service_layer_slobrok(*this);
+ node->wait_service_layer_slobrok();
}
}
- BmClusterController fake_controller(get_rpc_client());
- fake_controller.set_cluster_up(false);
+ BmClusterController fake_controller(get_rpc_client(), _params.get_num_nodes());
+ unsigned int node_idx = 0;
+ for (const auto &node : _nodes) {
+ if (node) {
+ fake_controller.set_cluster_up(node_idx, false);
+ }
+ ++node_idx;
+ }
}
void
@@ -284,11 +290,17 @@ BmCluster::start_distributors()
}
for (const auto &node : _nodes) {
if (node) {
- node->wait_distributor_slobrok(*this);
+ node->wait_distributor_slobrok();
+ }
+ }
+ BmClusterController fake_controller(get_rpc_client(), _params.get_num_nodes());
+ unsigned int node_idx = 0;
+ for (const auto &node : _nodes) {
+ if (node) {
+ fake_controller.set_cluster_up(node_idx, true);
}
+ ++node_idx;
}
- BmClusterController fake_controller(get_rpc_client());
- fake_controller.set_cluster_up(true);
// Wait for bucket ownership transfer safe time
std::this_thread::sleep_for(2s);
}
@@ -298,7 +310,7 @@ BmCluster::create_feed_handlers()
{
for (const auto &node : _nodes) {
if (node) {
- node->create_feed_handler(_params, *this);
+ node->create_feed_handler(_params);
}
}
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
index 8d8f89a6fe7..bdf4b5fed58 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
@@ -8,6 +8,7 @@
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/slobrok/sbmirror.h>
+#include <vespa/vespalib/stllike/asciistream.h>
using storage::api::StorageMessageAddress;
using storage::rpc::SharedRpcResources;
@@ -18,9 +19,11 @@ namespace search::bmcluster {
namespace {
FRT_RPCRequest *
-make_set_cluster_state_request()
+make_set_cluster_state_request(unsigned int num_nodes)
{
- storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1"));
+ vespalib::asciistream s;
+ s << "version:2 distributor:" << num_nodes << " storage:" << num_nodes;
+ storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState(s.str()));
storage::rpc::SlimeClusterStateBundleCodec codec;
auto encoded_bundle = codec.encode(bundle);
auto *req = new FRT_RPCRequest();
@@ -34,17 +37,18 @@ make_set_cluster_state_request()
}
-BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in)
- : _shared_rpc_resources(shared_rpc_resources_in)
+BmClusterController::BmClusterController(SharedRpcResources& shared_rpc_resources_in, unsigned int num_nodes)
+ : _shared_rpc_resources(shared_rpc_resources_in),
+ _num_nodes(num_nodes)
{
}
void
-BmClusterController::set_cluster_up(bool distributor)
+BmClusterController::set_cluster_up(unsigned int node_idx, bool distributor)
{
static vespalib::string _storage("storage");
- StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0);
- auto req = make_set_cluster_state_request();
+ StorageMessageAddress storage_address(&_storage, distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, node_idx);
+ auto req = make_set_cluster_state_request(_num_nodes);
auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(),
_shared_rpc_resources.target_factory(), 1);
uint64_t fake_bucket_id = 0;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
index d4e26e2f4fd..7b4313453f1 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.h
@@ -13,9 +13,10 @@ namespace search::bmcluster {
class BmClusterController
{
storage::rpc::SharedRpcResources& _shared_rpc_resources;
+ unsigned int _num_nodes;
public:
- BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in);
- void set_cluster_up(bool distributor);
+ BmClusterController(storage::rpc::SharedRpcResources& shared_rpc_resources_in, unsigned int num_nodes);
+ void set_cluster_up(unsigned int node_idx, bool distributor);
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
index 0f28468c5b5..5bc6b97487c 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
@@ -54,6 +54,7 @@ public:
void set_enable_distributor(bool value) { _enable_distributor = value; }
void set_enable_service_layer(bool value) { _enable_service_layer = value; }
void set_indexing_sequencer(vespalib::stringref sequencer) { _indexing_sequencer = sequencer; }
+ void set_num_nodes(uint32_t value) { _num_nodes = value; }
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; }
void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp
index fc6acbcab12..d947ca5e109 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_message_bus.cpp
@@ -111,7 +111,9 @@ BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply)
}
if (failed) {
++_errors;
- LOG(error, "Unexpected %s", reply_as_string(*reply).c_str());
+ if (_errors <= 10) {
+ LOG(error, "Unexpected %s", reply_as_string(*reply).c_str());
+ }
}
tracker->release();
} else {
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 2e2d541b43d..84bd921620d 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -146,6 +146,8 @@ int port_number(int base_port, PortBias bias)
return base_port + static_cast<int>(bias);
}
+storage::spi::Context context(storage::spi::Priority(0), 0);
+
}
std::shared_ptr<AttributesConfig> make_attributes_config() {
@@ -277,7 +279,7 @@ struct StorageConfigSet
SlobroksConfigBuilder slobroks;
MessagebusConfigBuilder messagebus;
- StorageConfigSet(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ StorageConfigSet(const vespalib::string &base_dir, unsigned int node_idx, bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
: config_id(config_id_in),
documenttypes(documenttypes_in),
@@ -299,9 +301,11 @@ struct StorageConfigSet
{
StorDistributionConfigBuilder::Group group;
{
- StorDistributionConfigBuilder::Group::Nodes node;
- node.index = 0;
- group.nodes.push_back(std::move(node));
+ for (unsigned int i = 0; i < params.get_num_nodes(); ++i) {
+ StorDistributionConfigBuilder::Group::Nodes node;
+ node.index = i;
+ group.nodes.push_back(std::move(node));
+ }
}
group.index = "invalid";
group.name = "invalid";
@@ -312,12 +316,13 @@ struct StorageConfigSet
dc.redundancy = 1;
dc.readyCopies = 1;
}
+ stor_server.nodeIndex = node_idx;
stor_server.isDistributor = distributor;
stor_server.contentNodeBucketDbStripeBits = params.get_bucket_db_stripe_bits();
if (distributor) {
- stor_server.rootFolder = "distributor";
+ stor_server.rootFolder = base_dir + "/distributor";
} else {
- stor_server.rootFolder = "storage";
+ stor_server.rootFolder = base_dir + "/storage";
}
make_slobroks_config(slobroks, slobrok_port);
stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
@@ -359,9 +364,9 @@ struct ServiceLayerConfigSet : public StorageConfigSet
StorBucketInitConfigBuilder stor_bucket_init;
StorVisitorConfigBuilder stor_visitor;
- ServiceLayerConfigSet(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ ServiceLayerConfigSet(const vespalib::string& base_dir, unsigned int node_idx, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
- : StorageConfigSet(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
+ : StorageConfigSet(base_dir, node_idx, false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
persistence(),
stor_filestor(),
stor_bucket_init(),
@@ -390,9 +395,9 @@ struct DistributorConfigSet : public StorageConfigSet
StorDistributormanagerConfigBuilder stor_distributormanager;
StorVisitordispatcherConfigBuilder stor_visitordispatcher;
- DistributorConfigSet(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
+ DistributorConfigSet(const vespalib::string& base_dir, unsigned int node_idx, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in,
int slobrok_port, int mbus_port, int rpc_port, int status_port, const BmClusterParams& params)
- : StorageConfigSet(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
+ : StorageConfigSet(base_dir, node_idx, true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, params),
stor_distributormanager(),
stor_visitordispatcher()
{
@@ -416,13 +421,14 @@ BmNode::~BmNode() = default;
class MyBmNode : public BmNode
{
+ BmCluster& _cluster;
std::shared_ptr<DocumenttypesConfig> _document_types;
std::shared_ptr<const DocumentTypeRepo> _repo;
proton::DocTypeName _doc_type_name;
std::shared_ptr<DocumentDBConfig> _document_db_config;
vespalib::string _base_dir;
search::index::DummyFileHeaderContext _file_header_context;
- int _node_idx;
+ unsigned int _node_idx;
int _tls_listen_port;
int _slobrok_port;
int _service_layer_mbus_port;
@@ -459,25 +465,26 @@ class MyBmNode : public BmNode
void create_document_db(const BmClusterParams& params);
public:
- MyBmNode(const vespalib::string &base_dir, int base_port, int node_idx, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port);
+ MyBmNode(const vespalib::string &base_dir, int base_port, unsigned int node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port);
~MyBmNode() override;
void initialize_persistence_provider() override;
- std::unique_ptr<SpiBmFeedHandler> make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) override;
+ void create_bucket(const document::Bucket& bucket) override;
void start_service_layer(const BmClusterParams& params) override;
void wait_service_layer() override;
void start_distributor(const BmClusterParams& params) override;
- void create_feed_handler(const BmClusterParams& params, BmCluster& cluster) override;
+ void create_feed_handler(const BmClusterParams& params) override;
void shutdown_feed_handler() override;
void shutdown_distributor() override;
void shutdown_service_layer() override;
- void wait_service_layer_slobrok(BmCluster& cluster) override;
- void wait_distributor_slobrok(BmCluster& cluster) override;
+ void wait_service_layer_slobrok() override;
+ void wait_distributor_slobrok() override;
IBmFeedHandler* get_feed_handler() override;
PersistenceProvider* get_persistence_provider() override;
};
-MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, int node_idx, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port)
+MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, unsigned int node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port)
: BmNode(),
+ _cluster(cluster),
_document_types(std::move(document_types)),
_repo(document::DocumentTypeRepoFactory::make(*_document_types)),
_doc_type_name("test"),
@@ -508,8 +515,8 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, int node_idx
_disk_mem_usage_notifier(),
_persistence_engine(),
_field_set_repo(std::make_unique<const document::FieldSetRepo>(*_repo)),
- _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
- _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
+ _service_layer_config(_base_dir, _node_idx, "bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
+ _distributor_config(_base_dir, _node_idx, "bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
_config_set(),
_config_context(std::make_shared<config::ConfigContext>(_config_set)),
_feed_handler(),
@@ -583,10 +590,10 @@ MyBmNode::initialize_persistence_provider()
get_persistence_provider()->initialize();
}
-std::unique_ptr<SpiBmFeedHandler>
-MyBmNode::make_create_bucket_feed_handler(bool skip_get_spi_bucket_info)
+void
+MyBmNode::create_bucket(const document::Bucket& bucket)
{
- return std::make_unique<SpiBmFeedHandler>(*_persistence_engine, *_field_set_repo, skip_get_spi_bucket_info);
+ get_persistence_provider()->createBucket(storage::spi::Bucket(bucket), context);
}
void
@@ -629,31 +636,31 @@ MyBmNode::start_distributor(const BmClusterParams& params)
}
void
-MyBmNode::create_feed_handler(const BmClusterParams& params, BmCluster& cluster)
+MyBmNode::create_feed_handler(const BmClusterParams& params)
{
StorageApiRpcService::Params rpc_params;
// This is the same compression config as the default in stor-communicationmanager.def.
rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
rpc_params.num_rpc_targets_per_node = params.get_rpc_targets_per_node();
if (params.get_use_document_api()) {
- _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(cluster.get_message_bus());
+ _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(_cluster.get_message_bus());
} else if (params.get_enable_distributor()) {
if (params.get_use_storage_chain()) {
assert(_distributor_chain_context);
_feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
} else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(cluster.get_message_bus(), true);
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(_cluster.get_message_bus(), true);
} else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(cluster.get_rpc_client(), _repo, rpc_params, true);
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(_cluster.get_rpc_client(), _repo, rpc_params, true);
}
} else if (params.needs_service_layer()) {
if (params.get_use_storage_chain()) {
assert(_service_layer_chain_context);
_feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
} else if (params.get_use_message_bus()) {
- _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(cluster.get_message_bus(), false);
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(_cluster.get_message_bus(), false);
} else {
- _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(cluster.get_rpc_client(), _repo, rpc_params, false);
+ _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(_cluster.get_rpc_client(), _repo, rpc_params, false);
}
}
}
@@ -697,23 +704,23 @@ MyBmNode::get_persistence_provider()
}
void
-MyBmNode::wait_service_layer_slobrok(BmCluster& cluster)
+MyBmNode::wait_service_layer_slobrok()
{
vespalib::asciistream s;
s << "storage/cluster.storage/storage/" << _node_idx;
- cluster.wait_slobrok(s.str());
+ _cluster.wait_slobrok(s.str());
s << "/default";
- cluster.wait_slobrok(s.str());
+ _cluster.wait_slobrok(s.str());
}
void
-MyBmNode::wait_distributor_slobrok(BmCluster& cluster)
+MyBmNode::wait_distributor_slobrok()
{
vespalib::asciistream s;
s << "storage/cluster.storage/distributor/" << _node_idx;
- cluster.wait_slobrok(s.str());
+ _cluster.wait_slobrok(s.str());
s << "/default";
- cluster.wait_slobrok(s.str());
+ _cluster.wait_slobrok(s.str());
}
unsigned int
@@ -723,9 +730,9 @@ BmNode::num_ports()
}
std::unique_ptr<BmNode>
-BmNode::create(const vespalib::string& base_dir, int base_port, int node_idx, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port)
+BmNode::create(const vespalib::string& base_dir, int base_port, unsigned int node_idx, BmCluster &cluster, const BmClusterParams& params, std::shared_ptr<document::DocumenttypesConfig> document_types, int slobrok_port)
{
- return std::make_unique<MyBmNode>(base_dir, base_port, node_idx, params, std::move(document_types), slobrok_port);
+ return std::make_unique<MyBmNode>(base_dir, base_port, node_idx, cluster, params, std::move(document_types), slobrok_port);
}
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
index e4afc3ed693..3647981f58b 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.h
@@ -7,6 +7,7 @@
namespace document {
+class Bucket;
class DocumentTypeRepo;
class DocumentType;
class Field;
@@ -22,7 +23,6 @@ namespace search::bmcluster {
class BmCluster;
class BmClusterParams;
class IBmFeedHandler;
-class SpiBmFeedHandler;
/*
* Class representing a single benchmark node in a benchmark cluster.
@@ -33,21 +33,21 @@ protected:
BmNode();
public:
virtual ~BmNode();
- virtual void initialize_persistence_provider()= 0;
- virtual std::unique_ptr<SpiBmFeedHandler> make_create_bucket_feed_handler(bool skip_get_spi_bucket_info) = 0;
+ virtual void initialize_persistence_provider() = 0;
+ virtual void create_bucket(const document::Bucket& bucket) = 0;
virtual void start_service_layer(const BmClusterParams& params) = 0;
virtual void wait_service_layer() = 0;
virtual void start_distributor(const BmClusterParams& params) = 0;
- virtual void create_feed_handler(const BmClusterParams& params, BmCluster& cluster) = 0;
+ virtual void create_feed_handler(const BmClusterParams& params) = 0;
virtual void shutdown_feed_handler() = 0;
virtual void shutdown_distributor() = 0;
virtual void shutdown_service_layer() = 0;
- virtual void wait_service_layer_slobrok(BmCluster& cluster) = 0;
- virtual void wait_distributor_slobrok(BmCluster& cluster) = 0;
+ virtual void wait_service_layer_slobrok() = 0;
+ virtual void wait_distributor_slobrok() = 0;
virtual IBmFeedHandler* get_feed_handler() = 0;
virtual storage::spi::PersistenceProvider *get_persistence_provider() = 0;
static unsigned int num_ports();
- static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, int node_idx, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port);
+ static std::unique_ptr<BmNode> create(const vespalib::string &base_dir, int base_port, unsigned int node_idx, BmCluster& cluster, const BmClusterParams& params, std::shared_ptr<const document::internal::InternalDocumenttypesType> document_types, int slobrok_port);
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
index 2aa7ee5ec47..e905b493cf4 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
@@ -124,12 +124,6 @@ SpiBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_
}
void
-SpiBmFeedHandler::create_bucket(const document::Bucket& bucket)
-{
- _provider.createBucket(Bucket(bucket), context);
-}
-
-void
SpiBmFeedHandler::attach_bucket_info_queue(PendingTracker& tracker)
{
if (!_skip_get_spi_bucket_info) {
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
index 3a24a5b38b7..bbc9e3b8e74 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.h
@@ -27,7 +27,6 @@ public:
void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
- void create_bucket(const document::Bucket& bucket);
void attach_bucket_info_queue(PendingTracker &tracker) override;
uint32_t get_error_count() const override;
const vespalib::string &get_name() const override;