From ee92e3bd735b67fd839029dabbd6eebd2b56e42d Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Thu, 24 Sep 2020 16:15:22 +0200 Subject: Factor out feed handlers for benchmark feed. --- .../storage_api_rpc_bm_feed_handler.cpp | 145 +++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp (limited to 'searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp') diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp new file mode 100644 index 00000000000..e2f3a951b99 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -0,0 +1,145 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_rpc_bm_feed_handler.h" +#include "pending_tracker.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using document::DocumentTypeRepo; +using storage::api::StorageMessageAddress; +using storage::rpc::SharedRpcResources; + +namespace feedbm { + +namespace { + +FRT_RPCRequest *make_set_cluster_state_request() { + storage::lib::ClusterStateBundle bundle(storage::lib::ClusterState("version:2 distributor:1 storage:1")); + storage::rpc::SlimeClusterStateBundleCodec codec; + auto encoded_bundle = codec.encode(bundle); + auto *req = new FRT_RPCRequest(); + auto* params = req->GetParams(); + params->AddInt8(static_cast(encoded_bundle._compression_type)); + params->AddInt32(encoded_bundle._uncompressed_length); + const auto buf_len = encoded_bundle._buffer->getDataLen(); + params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len); + req->SetMethodName("setdistributionstates"); + return req; +} + +void set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMessageAddress &storage_address) { + auto req = make_set_cluster_state_request(); + auto target_resolver = std::make_unique(shared_rpc_resources.slobrok_mirror(), shared_rpc_resources.target_factory()); + auto target = target_resolver->resolve_rpc_target(storage_address); + target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout + assert(!req->IsError()); + req->SubRef(); +} + +} + +class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher +{ + std::mutex _mutex; + vespalib::hash_map _pending; +public: + MyMessageDispatcher() + : storage::MessageDispatcher(), + _mutex(), + _pending() + { + } + ~MyMessageDispatcher() override; + void dispatch_sync(std::shared_ptr msg) override { + release(msg->getMsgId()); + } + void dispatch_async(std::shared_ptr msg) override { + release(msg->getMsgId()); + } + void retain(uint64_t msg_id, PendingTracker &tracker) { + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); + } + void release(uint64_t msg_id) { + PendingTracker *tracker = nullptr; + { + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + assert(itr != _pending.end()); + tracker = itr->second; + _pending.erase(itr); + } + tracker->release(); + } +}; + +StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, std::shared_ptr repo) + : IBmFeedHandler(), + _storage_address(std::make_unique("storage", storage::lib::NodeType::STORAGE, 0)), + _shared_rpc_resources(shared_rpc_resources_in), + _message_dispatcher(std::make_unique()), + _message_codec_provider(std::make_unique(repo, std::make_shared())), + _rpc_client(std::make_unique(*_message_dispatcher, _shared_rpc_resources, *_message_codec_provider, storage::rpc::StorageApiRpcService::Params())) +{ + set_cluster_up(_shared_rpc_resources, *_storage_address); +} + +StorageApiRpcBmFeedHandler::~StorageApiRpcBmFeedHandler() = default; + +void +StorageApiRpcBmFeedHandler::send_rpc(std::shared_ptr cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + cmd->setAddress(*_storage_address); + _message_dispatcher->retain(cmd->getMsgId(), pending_tracker); + _rpc_client->send_rpc_v1_request(std::move(cmd)); +} + +void +StorageApiRpcBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique(bucket, std::move(document), timestamp); + send_rpc(std::move(cmd), tracker); +} + +void +StorageApiRpcBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique(bucket, std::move(document_update), timestamp); + send_rpc(std::move(cmd), tracker); +} + +void +StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique(bucket, document_id, timestamp); + send_rpc(std::move(cmd), tracker); +} + +} -- cgit v1.2.3