aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-09-14 18:39:41 +0200
committerTor Egge <Tor.Egge@online.no>2021-09-14 18:39:41 +0200
commit2d5400b085f0c7155c7c74d8bef06d280cf84cce (patch)
treefddd8c45e986e29a471d10318a379f408fdf2907
parent4bd1e39ad8e78fb232abbfa425b7e7bbdb839497 (diff)
Move shared code to base class.
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp72
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h37
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp73
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h16
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp72
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h15
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp69
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h15
9 files changed, 137 insertions, 233 deletions
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
index 44e13c5954e..17f9bff38d3 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -17,6 +17,7 @@ vespa_add_library(searchcore_bmcluster STATIC
pending_tracker.cpp
pending_tracker_hash.cpp
spi_bm_feed_handler.cpp
+ storage_api_bm_feed_handler_base.cpp
storage_api_chain_bm_feed_handler.cpp
storage_api_message_bus_bm_feed_handler.cpp
storage_api_rpc_bm_feed_handler.cpp
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp
new file mode 100644
index 00000000000..cf7b78aaab9
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.cpp
@@ -0,0 +1,72 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_bm_feed_handler_base.h"
+#include "i_bm_distribution.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storageapi/message/persistence.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+
+namespace search::bmcluster {
+
+StorageApiBmFeedHandlerBase::StorageApiBmFeedHandlerBase(const vespalib::string &base_name, const IBmDistribution &distribution, bool distributor)
+ : _name(base_name + "(" + (distributor ? "distributor" : "service-layer") + ")"),
+ _distribution(distribution),
+ _distributor(distributor)
+{
+}
+
+StorageApiBmFeedHandlerBase::~StorageApiBmFeedHandlerBase() = default;
+
+uint32_t
+StorageApiBmFeedHandlerBase::route_cmd(storage::api::StorageCommand& cmd)
+{
+ auto bucket = cmd.getBucket();
+ cmd.setSourceIndex(_distributor ? 0 : _distribution.get_distributor_node_idx(bucket));
+ return _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+}
+
+void
+StorageApiBmFeedHandlerBase::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_cmd(std::move(cmd), tracker);
+}
+
+void
+StorageApiBmFeedHandlerBase::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_cmd(std::move(cmd), tracker);
+}
+
+void
+StorageApiBmFeedHandlerBase::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_cmd(std::move(cmd), tracker);
+}
+
+void
+StorageApiBmFeedHandlerBase::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
+ send_cmd(std::move(cmd), tracker);
+}
+
+const vespalib::string&
+StorageApiBmFeedHandlerBase::get_name() const
+{
+ return _name;
+}
+
+bool
+StorageApiBmFeedHandlerBase::manages_timestamp() const
+{
+ return _distributor;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h
new file mode 100644
index 00000000000..150b19ccb10
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_bm_feed_handler_base.h
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace storage::api { class StorageCommand; }
+
+namespace search::bmcluster {
+
+class IBmDistribution;
+
+/*
+ * Base class for benchmark feed handlers that feed to service layer
+ * or distributor using storage api protocol.
+ */
+class StorageApiBmFeedHandlerBase : public IBmFeedHandler
+{
+protected:
+ vespalib::string _name;
+ const IBmDistribution& _distribution;
+ bool _distributor;
+
+ virtual void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) = 0;
+ uint32_t route_cmd(storage::api::StorageCommand& cmd);
+public:
+ StorageApiBmFeedHandlerBase(const vespalib::string& base_name, const IBmDistribution &distribution, bool distributor);
+ ~StorageApiBmFeedHandlerBase();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
+ const vespalib::string &get_name() const override;
+ bool manages_timestamp() const override;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
index f7df593a75e..3d9c55ab6e7 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.cpp
@@ -3,49 +3,28 @@
#include "storage_api_chain_bm_feed_handler.h"
#include "i_bm_distribution.h"
#include "pending_tracker.h"
-#include "storage_reply_error_checker.h"
#include "bm_storage_link_context.h"
#include "bm_storage_link.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/message/state.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vdslib/state/cluster_state_bundle.h>
-#include <cassert>
-
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
+#include <vespa/storageapi/messageapi/storagecommand.h>
namespace search::bmcluster {
StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::vector<std::shared_ptr<BmStorageLinkContext>> contexts, const IBmDistribution& distribution, bool distributor)
- : IBmFeedHandler(),
- _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
- _distributor(distributor),
+ : StorageApiBmFeedHandlerBase("StorageApiChainBmFeedHandler", distribution, distributor),
_contexts(std::move(contexts)),
- _no_link_error_count(0u),
- _distribution(distribution)
+ _no_link_error_count(0u)
{
}
StorageApiChainBmFeedHandler::~StorageApiChainBmFeedHandler() = default;
void
-StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+StorageApiChainBmFeedHandler::send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker)
{
- auto bucket = cmd->getBucket();
- if (_distributor) {
- cmd->setSourceIndex(0);
- } else {
- cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket));
- }
- uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+ uint32_t node_idx = route_cmd(*cmd);
if (node_idx < _contexts.size() && _contexts[node_idx]) {
auto bm_link = _contexts[node_idx]->bm_link;
- bm_link->retain(cmd->getMsgId(), pending_tracker);
+ bm_link->retain(cmd->getMsgId(), tracker);
bm_link->sendDown(std::move(cmd));
} else {
++_no_link_error_count;
@@ -53,34 +32,6 @@ StorageApiChainBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageComm
}
void
-StorageApiChainBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiChainBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
- send_msg(std::move(cmd), tracker);
-}
-
-void
StorageApiChainBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
{
}
@@ -98,16 +49,4 @@ StorageApiChainBmFeedHandler::get_error_count() const
return error_count;
}
-const vespalib::string&
-StorageApiChainBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-StorageApiChainBmFeedHandler::manages_timestamp() const
-{
- return _distributor;
-}
-
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
index d4ba06403e3..41d61cf9b13 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_chain_bm_feed_handler.h
@@ -2,7 +2,7 @@
#pragma once
-#include "i_bm_feed_handler.h"
+#include "storage_api_bm_feed_handler_base.h"
#include <vector>
#include <atomic>
@@ -17,28 +17,18 @@ class IBmDistribution;
* Benchmark feed handler for feed to service layer or distributor
* using storage api protocol directly on the storage chain.
*/
-class StorageApiChainBmFeedHandler : public IBmFeedHandler
+class StorageApiChainBmFeedHandler : public StorageApiBmFeedHandlerBase
{
private:
- vespalib::string _name;
- bool _distributor;
std::vector<std::shared_ptr<BmStorageLinkContext>> _contexts;
std::atomic<uint32_t> _no_link_error_count;
- const IBmDistribution& _distribution;
- void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+ void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) override;
public:
StorageApiChainBmFeedHandler(std::vector<std::shared_ptr<BmStorageLinkContext>> contexts, const IBmDistribution& distribution, bool distributor);
~StorageApiChainBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
-
void attach_bucket_info_queue(PendingTracker &tracker) override;
uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
index 4f84be508db..4332f399c85 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.cpp
@@ -3,82 +3,34 @@
#include "storage_api_message_bus_bm_feed_handler.h"
#include "bm_message_bus.h"
#include "i_bm_distribution.h"
-#include "pending_tracker.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/mbusprot/storagecommand.h>
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
-using storage::api::StorageMessageAddress;
-using storage::lib::NodeType;
-
namespace search::bmcluster {
StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor)
- : IBmFeedHandler(),
- _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
- _distributor(distributor),
+ : StorageApiBmFeedHandlerBase("StorageApiMessageBusBmFeedHandler", distribution, distributor),
_message_bus(message_bus),
_routes(distribution.get_num_nodes(), distributor),
- _no_route_error_count(0),
- _distribution(distribution)
+ _no_route_error_count(0)
{
}
StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default;
void
-StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+StorageApiMessageBusBmFeedHandler::send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker)
{
- auto bucket = cmd->getBucket();
- if (_distributor) {
- cmd->setSourceIndex(0);
- } else {
- cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket));
- }
- auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
- uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+ uint32_t node_idx = route_cmd(*cmd);
if (_routes.has_route(node_idx)) {
+ auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
auto& route = _routes.get_route(node_idx);
- _message_bus.send_msg(std::move(msg), route, pending_tracker);
+ _message_bus.send_msg(std::move(msg), route, tracker);
} else {
++_no_route_error_count;
}
}
void
-StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
- send_msg(std::move(cmd), tracker);
-}
-
-void
-StorageApiMessageBusBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
- send_msg(std::move(cmd), tracker);
-}
-
-void
StorageApiMessageBusBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
{
}
@@ -89,16 +41,4 @@ StorageApiMessageBusBmFeedHandler::get_error_count() const
return _message_bus.get_error_count() + _no_route_error_count;
}
-const vespalib::string&
-StorageApiMessageBusBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-StorageApiMessageBusBmFeedHandler::manages_timestamp() const
-{
- return _distributor;
-}
-
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
index c02e594964a..9bfd6e25e03 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_message_bus_bm_feed_handler.h
@@ -2,7 +2,7 @@
#pragma once
-#include "i_bm_feed_handler.h"
+#include "storage_api_bm_feed_handler_base.h"
#include "bm_message_bus_routes.h"
#include <atomic>
@@ -17,26 +17,17 @@ class IBmDistribution;
* Benchmark feed handler for feed to service layer or distributor
* using storage api protocol over message bus.
*/
-class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler
+class StorageApiMessageBusBmFeedHandler : public StorageApiBmFeedHandlerBase
{
- vespalib::string _name;
- bool _distributor;
BmMessageBus& _message_bus;
BmMessageBusRoutes _routes;
std::atomic<uint32_t> _no_route_error_count;
- const IBmDistribution& _distribution;
- void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+ void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) override;
public:
StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, const IBmDistribution& distribution, bool distributor);
~StorageApiMessageBusBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
void attach_bucket_info_queue(PendingTracker &tracker) override;
uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
};
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
index 57d50853c71..435c6bf8869 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.cpp
@@ -5,23 +5,15 @@
#include "pending_tracker.h"
#include "pending_tracker_hash.h"
#include "storage_reply_error_checker.h"
-#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/storageapi/messageapi/storagemessage.h>
-#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/messageapi/storagecommand.h>
#include <vespa/storage/storageserver/message_dispatcher.h>
#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
-using document::Document;
-using document::DocumentId;
-using document::DocumentUpdate;
using document::DocumentTypeRepo;
-using storage::api::StorageMessageAddress;
using storage::rpc::SharedRpcResources;
using storage::rpc::StorageApiRpcService;
-using storage::lib::NodeType;
namespace search::bmcluster {
@@ -65,34 +57,25 @@ StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& share
const StorageApiRpcService::Params& rpc_params,
const IBmDistribution& distribution,
bool distributor)
- : IBmFeedHandler(),
- _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
- _distributor(distributor),
+ : StorageApiBmFeedHandlerBase("StorageApiRpcBmFeedHandler", distribution, distributor),
_addresses(distribution.get_num_nodes(), distributor),
_no_address_error_count(0u),
_shared_rpc_resources(shared_rpc_resources_in),
_message_dispatcher(std::make_unique<MyMessageDispatcher>()),
_message_codec_provider(std::make_unique<storage::rpc::MessageCodecProvider>(repo)),
- _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params)),
- _distribution(distribution)
+ _rpc_client(std::make_unique<storage::rpc::StorageApiRpcService>(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, rpc_params))
{
}
StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default;
void
-StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+StorageApiRpcBmFeedHandler::send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker)
{
- auto bucket = cmd->getBucket();
- if (_distributor) {
- cmd->setSourceIndex(0);
- } else {
- cmd->setSourceIndex(_distribution.get_distributor_node_idx(bucket));
- }
- uint32_t node_idx = _distributor ? _distribution.get_distributor_node_idx(bucket) : _distribution.get_service_layer_node_idx(bucket);
+ uint32_t node_idx = route_cmd(*cmd);
if (_addresses.has_address(node_idx)) {
cmd->setAddress(_addresses.get_address(node_idx));
- _message_dispatcher->retain(cmd->getMsgId(), pending_tracker);
+ _message_dispatcher->retain(cmd->getMsgId(), tracker);
_rpc_client->send_rpc_v1_request(std::move(cmd));
} else {
++_no_address_error_count;
@@ -100,34 +83,6 @@ StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr<storage::api::StorageComman
}
void
-StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
-StorageApiRpcBmFeedHandler::get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker)
-{
- auto cmd = std::make_unique<storage::api::GetCommand>(bucket, document_id, field_set_string);
- send_rpc(std::move(cmd), tracker);
-}
-
-void
StorageApiRpcBmFeedHandler::attach_bucket_info_queue(PendingTracker&)
{
}
@@ -138,16 +93,4 @@ StorageApiRpcBmFeedHandler::get_error_count() const
return _message_dispatcher->get_error_count() + _no_address_error_count;
}
-const vespalib::string&
-StorageApiRpcBmFeedHandler::get_name() const
-{
- return _name;
-}
-
-bool
-StorageApiRpcBmFeedHandler::manages_timestamp() const
-{
- return _distributor;
-}
-
}
diff --git a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
index 48b6233d0a4..25e1bd8de06 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/storage_api_rpc_bm_feed_handler.h
@@ -2,7 +2,7 @@
#pragma once
-#include "i_bm_feed_handler.h"
+#include "storage_api_bm_feed_handler_base.h"
#include "bm_storage_message_addresses.h"
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
@@ -25,20 +25,17 @@ class IBmDistribution;
* Benchmark feed handler for feed to service layer or distributor
* using storage api protocol over rpc.
*/
-class StorageApiRpcBmFeedHandler : public IBmFeedHandler
+class StorageApiRpcBmFeedHandler : public StorageApiBmFeedHandlerBase
{
class MyMessageDispatcher;
- vespalib::string _name;
- bool _distributor;
BmStorageMessageAddresses _addresses;
std::atomic<uint32_t> _no_address_error_count;
storage::rpc::SharedRpcResources& _shared_rpc_resources;
std::unique_ptr<MyMessageDispatcher> _message_dispatcher;
std::unique_ptr<storage::rpc::MessageCodecProvider> _message_codec_provider;
std::unique_ptr<storage::rpc::StorageApiRpcService> _rpc_client;
- const IBmDistribution& _distribution;
- void send_rpc(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+ void send_cmd(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker) override;
public:
StorageApiRpcBmFeedHandler(storage::rpc::SharedRpcResources& shared_rpc_resources_in,
std::shared_ptr<const document::DocumentTypeRepo> repo,
@@ -46,14 +43,8 @@ public:
const IBmDistribution& distribution,
bool distributor);
~StorageApiRpcBmFeedHandler();
- void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
- void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
- void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
- void get(const document::Bucket& bucket, vespalib::stringref field_set_string, const document::DocumentId& document_id, PendingTracker& tracker) override;
void attach_bucket_info_queue(PendingTracker &tracker) override;
uint32_t get_error_count() const override;
- const vespalib::string &get_name() const override;
- bool manages_timestamp() const override;
};
}