diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-09-14 18:39:41 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-09-14 18:39:41 +0200 |
commit | 2d5400b085f0c7155c7c74d8bef06d280cf84cce (patch) | |
tree | fddd8c45e986e29a471d10318a379f408fdf2907 | |
parent | 4bd1e39ad8e78fb232abbfa425b7e7bbdb839497 (diff) |
Move shared code to base class.
9 files changed, 137 insertions, 233 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index 44e13c5954e..17f9bff38d3 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -17,6 +17,7 @@ vespa_add_library(searchcore_bmcluster STATIC pending_tracker.cpp pending_tracker_hash.cpp spi_bm_feed_handler.cpp + storage_api_bm_feed_handler_base.cpp storage_api_chain_bm_feed_handler.cpp storage_api_message_bus_bm_feed_handler.cpp storage_api_rpc_bm_feed_handler.cpp diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp new file mode 100644 index 00000000000..cf7b78aaab9 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp @@ -0,0 +1,72 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_bm_feed_handler_base.h" +#include "i_bm_distribution.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storageapi/message/persistence.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; + +namespace search::bmcluster { + +StorageApiBmFeedHandlerBase::StorageApiBmFeedHandlerBase(const vespalib::string &base_name, const IBmDistribution &distribution, bool distributor) + : _name(base_name + "(" + (distributor ? "distributor" : "service-layer") + ")"), + _distribution(distribution), + _distributor(distributor) +{ +} + +StorageApiBmFeedHandlerBase::~StorageApiBmFeedHandlerBase() = default; + +uint32_t +StorageApiBmFeedHandlerBase::route_cmd(storage::api::StorageCommand& cmd) +{ + auto bucket = cmd.getBucket(); + cmd.setSourceIndex(_distributor ? 0 : _distribution.get_distributor_node_idx(bucket)); + return _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); +} + +void +StorageApiBmFeedHandlerBase::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_cmd(std::move(cmd), tracker); +} + +void +StorageApiBmFeedHandlerBase::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_cmd(std::move(cmd), tracker); +} + +void +StorageApiBmFeedHandlerBase::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_cmd(std::move(cmd), tracker); +} + +void +StorageApiBmFeedHandlerBase::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); + send_cmd(std::move(cmd), tracker); +} + +const vespalib::string& +StorageApiBmFeedHandlerBase::get_name() const +{ + return _name; +} + +bool +StorageApiBmFeedHandlerBase::manages_timestamp() const +{ + return _distributor; +} + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h new file mode 100644 index 00000000000..150b19ccb10 --- /dev/null +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace storage::api { class StorageCommand; } + +namespace search::bmcluster { + +class IBmDistribution; + +/* + * Base class for benchmark feed handlers that feed to service layer + * or distributor using storage api protocol. + */ +class StorageApiBmFeedHandlerBase : public IBmFeedHandler +{ +protected: + vespalib::string _name; + const IBmDistribution& _distribution; + bool _distributor; + + virtual void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) = 0; + uint32_t route_cmd(storage::api::StorageCommand& cmd); +public: + StorageApiBmFeedHandlerBase(const vespalib::string& base_name, const IBmDistribution &distribution, bool distributor); + ~StorageApiBmFeedHandlerBase(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; + const vespalib::string &get_name() const override; + bool manages_timestamp() const override; +}; + +} diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp index f7df593a75e..3d9c55ab6e7 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp @@ -3,49 +3,28 @@ #include "storage_api_chain_bm_feed_handler.h" #include "i_bm_distribution.h" #include "pending_tracker.h" -#include "storage_reply_error_checker.h" #include "bm_storage_link_context.h" #include "bm_storage_link.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/state.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> -#include <cassert> - -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; +#include <vespa/storageapi/messageapi/storagecommand.h> namespace search::bmcluster { StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::vector<std::shared_ptr<BmStorageLinkContext>> contexts, const IBmDistribution& distribution, bool distributor) - : IBmFeedHandler(), - _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), - _distributor(distributor), + : StorageApiBmFeedHandlerBase("StorageApiChainBmFeedHandler", distribution, distributor), _contexts(std::move(contexts)), - _no_link_error_count(0u), - _distribution(distribution) + _no_link_error_count(0u) { } StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default; void -StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +StorageApiChainBmFeedHandler::send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) { - auto bucket = cmd->getBucket(); - if (_distributor) { - cmd->setSourceIndex(0); - } else { - cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket)); - } - uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); + uint32_t node_idx = route_cmd(*cmd); if (node_idx < _contexts.size() && _contexts[node_idx]) { auto bm_link = _contexts[node_idx]->bm_link; - bm_link->retain(cmd->getMsgId(), pending_tracker); + bm_link->retain(cmd->getMsgId(), tracker); bm_link->sendDown(std::move(cmd)); } else { ++_no_link_error_count; @@ -53,34 +32,6 @@ StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageComm } void -StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiChainBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); - send_msg(std::move(cmd), tracker); -} - -void StorageApiChainBmFeedHandler::attach_bucket_info_queue(PendingTracker&) { } @@ -98,16 +49,4 @@ StorageApiChainBmFeedHandler::get_error_count() const return error_count; } -const vespalib::string& -StorageApiChainBmFeedHandler::get_name() const -{ - return _name; -} - -bool -StorageApiChainBmFeedHandler::manages_timestamp() const -{ - return _distributor; -} - } diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h index d4ba06403e3..41d61cf9b13 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h @@ -2,7 +2,7 @@ #pragma once -#include "i_bm_feed_handler.h" +#include "storage_api_bm_feed_handler_base.h" #include <vector> #include <atomic> @@ -17,28 +17,18 @@ class IBmDistribution; * Benchmark feed handler for feed to service layer or distributor * using storage api protocol directly on the storage chain. */ -class StorageApiChainBmFeedHandler : public IBmFeedHandler +class StorageApiChainBmFeedHandler : public StorageApiBmFeedHandlerBase { private: - vespalib::string _name; - bool _distributor; std::vector<std::shared_ptr<BmStorageLinkContext>> _contexts; std::atomic<uint32_t> _no_link_error_count; - const IBmDistribution& _distribution; - void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); + void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) override; public: StorageApiChainBmFeedHandler(std::vector<std::shared_ptr<BmStorageLinkContext>> contexts, const IBmDistribution& distribution, bool distributor); ~StorageApiChainBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; - void attach_bucket_info_queue(PendingTracker &tracker) override; uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp index 4f84be508db..4332f399c85 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp @@ -3,82 +3,34 @@ #include "storage_api_message_bus_bm_feed_handler.h" #include "bm_message_bus.h" #include "i_bm_distribution.h" -#include "pending_tracker.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/mbusprot/storagecommand.h> -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; -using storage::api::StorageMessageAddress; -using storage::lib::NodeType; - namespace search::bmcluster { StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor) - : IBmFeedHandler(), - _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), - _distributor(distributor), + : StorageApiBmFeedHandlerBase("StorageApiMessageBusBmFeedHandler", distribution, distributor), _message_bus(message_bus), _routes(distribution.get_num_nodes(), distributor), - _no_route_error_count(0), - _distribution(distribution) + _no_route_error_count(0) { } StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default; void -StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +StorageApiMessageBusBmFeedHandler::send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) { - auto bucket = cmd->getBucket(); - if (_distributor) { - cmd->setSourceIndex(0); - } else { - cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket)); - } - auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); - uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); + uint32_t node_idx = route_cmd(*cmd); if (_routes.has_route(node_idx)) { + auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); auto& route = _routes.get_route(node_idx); - _message_bus.send_msg(std::move(msg), route, pending_tracker); + _message_bus.send_msg(std::move(msg), route, tracker); } else { ++_no_route_error_count; } } void -StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); - send_msg(std::move(cmd), tracker); -} - -void -StorageApiMessageBusBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); - send_msg(std::move(cmd), tracker); -} - -void StorageApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&) { } @@ -89,16 +41,4 @@ StorageApiMessageBusBmFeedHandler::get_error_count() const return _message_bus.get_error_count() + _no_route_error_count; } -const vespalib::string& -StorageApiMessageBusBmFeedHandler::get_name() const -{ - return _name; -} - -bool -StorageApiMessageBusBmFeedHandler::manages_timestamp() const -{ - return _distributor; -} - } diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h index c02e594964a..9bfd6e25e03 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h @@ -2,7 +2,7 @@ #pragma once -#include "i_bm_feed_handler.h" +#include "storage_api_bm_feed_handler_base.h" #include "bm_message_bus_routes.h" #include <atomic> @@ -17,26 +17,17 @@ class IBmDistribution; * Benchmark feed handler for feed to service layer or distributor * using storage api protocol over message bus. */ -class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler +class StorageApiMessageBusBmFeedHandler : public StorageApiBmFeedHandlerBase { - vespalib::string _name; - bool _distributor; BmMessageBus& _message_bus; BmMessageBusRoutes _routes; std::atomic<uint32_t> _no_route_error_count; - const IBmDistribution& _distribution; - void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); + void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) override; public: StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor); ~StorageApiMessageBusBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; void attach_bucket_info_queue(PendingTracker &tracker) override; uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; }; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp index 57d50853c71..435c6bf8869 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp @@ -5,23 +5,15 @@ #include "pending_tracker.h" #include "pending_tracker_hash.h" #include "storage_reply_error_checker.h" -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/storageapi/messageapi/storagemessage.h> -#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/messageapi/storagecommand.h> #include <vespa/storage/storageserver/message_dispatcher.h> #include <vespa/storage/storageserver/rpc/message_codec_provider.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> -using document::Document; -using document::DocumentId; -using document::DocumentUpdate; using document::DocumentTypeRepo; -using storage::api::StorageMessageAddress; using storage::rpc::SharedRpcResources; using storage::rpc::StorageApiRpcService; -using storage::lib::NodeType; namespace search::bmcluster { @@ -65,34 +57,25 @@ StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& share const StorageApiRpcService::Params& rpc_params, const IBmDistribution& distribution, bool distributor) - : IBmFeedHandler(), - _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), - _distributor(distributor), + : StorageApiBmFeedHandlerBase("StorageApiRpcBmFeedHandler", distribution, distributor), _addresses(distribution.get_num_nodes(), distributor), _no_address_error_count(0u), _shared_rpc_resources(shared_rpc_resources_in), _message_dispatcher(std::make_unique<MyMessageDispatcher>()), _message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo)), - _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)), - _distribution(distribution) + _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)) { } StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; void -StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +StorageApiRpcBmFeedHandler::send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) { - auto bucket = cmd->getBucket(); - if (_distributor) { - cmd->setSourceIndex(0); - } else { - cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket)); - } - uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket); + uint32_t node_idx = route_cmd(*cmd); if (_addresses.has_address(node_idx)) { cmd->setAddress(_addresses.get_address(node_idx)); - _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); + _message_dispatcher->retain(cmd->getMsgId(), tracker); _rpc_client->send_rpc_v1_request(std::move(cmd)); } else { ++_no_address_error_count; @@ -100,34 +83,6 @@ StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageComman } void -StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); - send_rpc(std::move(cmd), tracker); -} - -void -StorageApiRpcBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) -{ - auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string); - send_rpc(std::move(cmd), tracker); -} - -void StorageApiRpcBmFeedHandler::attach_bucket_info_queue(PendingTracker&) { } @@ -138,16 +93,4 @@ StorageApiRpcBmFeedHandler::get_error_count() const return _message_dispatcher->get_error_count() + _no_address_error_count; } -const vespalib::string& -StorageApiRpcBmFeedHandler::get_name() const -{ - return _name; -} - -bool -StorageApiRpcBmFeedHandler::manages_timestamp() const -{ - return _distributor; -} - } diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h index 48b6233d0a4..25e1bd8de06 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h @@ -2,7 +2,7 @@ #pragma once -#include "i_bm_feed_handler.h" +#include "storage_api_bm_feed_handler_base.h" #include "bm_storage_message_addresses.h" #include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> @@ -25,20 +25,17 @@ class IBmDistribution; * Benchmark feed handler for feed to service layer or distributor * using storage api protocol over rpc. */ -class StorageApiRpcBmFeedHandler : public IBmFeedHandler +class StorageApiRpcBmFeedHandler : public StorageApiBmFeedHandlerBase { class MyMessageDispatcher; - vespalib::string _name; - bool _distributor; BmStorageMessageAddresses _addresses; std::atomic<uint32_t> _no_address_error_count; storage::rpc::SharedRpcResources& _shared_rpc_resources; std::unique_ptr<MyMessageDispatcher> _message_dispatcher; std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider; std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client; - const IBmDistribution& _distribution; - void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); + void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) override; public: StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in, std::shared_ptr<const document::DocumentTypeRepo> repo, @@ -46,14 +43,8 @@ public: const IBmDistribution& distribution, bool distributor); ~StorageApiRpcBmFeedHandler(); - void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; - void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; - void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; - void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override; void attach_bucket_info_queue(PendingTracker &tracker) override; uint32_t get_error_count() const override; - const vespalib::string &get_name() const override; - bool manages_timestamp() const override; }; } |