diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-08 13:42:12 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-14 13:22:14 +0000 |
commit | 54150406f3c490463bc5371c9480452da168bd5c (patch) | |
tree | 73721c84df79dd649c967a5fe9f0525431476af6 /storage | |
parent | ea22ec7cb6e35cf591d08ffe898388a7f08593cc (diff) |
Basic functionality for direct RPC for StorageAPI communication
This has several advantages:
* Completely bypasses all MessageBus indirections
* Explicit setup of RPC thread pool
* Direct dispatch from RPC thread to persistence queue pool
* Better control of encoding/decoding and buffer usage
Diffstat (limited to 'storage')
28 files changed, 920 insertions, 214 deletions
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index dc0a0194d36..cbfd4f1606d 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -39,6 +39,7 @@ vespa_define_module( src/vespa/storage/persistence src/vespa/storage/persistence/filestorage src/vespa/storage/storageserver + src/vespa/storage/storageserver/rpc src/vespa/storage/storageutil src/vespa/storage/tools src/vespa/storage/visiting diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt index e49a69414b7..680779f27cb 100644 --- a/storage/src/tests/storageserver/CMakeLists.txt +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -14,7 +14,7 @@ vespa_add_executable(storage_storageserver_gtest_runner_app TEST communicationmanagertest.cpp configurable_bucket_resolver_test.cpp documentapiconvertertest.cpp - fnet_listener_test.cpp + cluster_controller_rpc_api_service_test.cpp mergethrottlertest.cpp priorityconvertertest.cpp service_layer_error_listener_test.cpp diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp index f82af1f8e5c..7510e0579ea 100644 --- a/storage/src/tests/storageserver/fnet_listener_test.cpp +++ b/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp @@ -1,25 +1,27 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/storageserver/fnetlistener.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/messagebus/testlib/slobrok.h> #include <vespa/storage/storageserver/message_enqueuer.h> #include <vespa/storage/storageserver/rpcrequestwrapper.h> -#include <vespa/storage/storageserver/slime_cluster_state_bundle_codec.h> +#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vdstestlib/config/dirconfig.hpp> -#include <vespa/messagebus/testlib/slobrok.h> #include <tests/common/testhelper.h> #include <vespa/vespalib/gtest/gtest.h> #include <vector> -namespace storage { +namespace storage::rpc { using document::FixedBucketSpaces; using namespace ::testing; -struct FNetListenerTest : Test { +struct ClusterControllerApiRpcServiceTest : Test { }; namespace { @@ -38,11 +40,11 @@ struct DummyReturnHandler : FRT_IReturnHandler { }; struct FixtureBase { - // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests mbus::Slobrok slobrok; vdstestlib::DirConfig config; MockOperationEnqueuer enqueuer; - std::unique_ptr<FNetListener> fnet_listener; + std::unique_ptr<SharedRpcResources> shared_rpc_resources; + std::unique_ptr<ClusterControllerApiRpcService> cc_service; DummyReturnHandler return_handler; bool request_is_detached{false}; FRT_RPCRequest* bound_request{nullptr}; @@ -52,7 +54,10 @@ struct FixtureBase { { config.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(config, slobrok); - fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0); + + shared_rpc_resources = std::make_unique<SharedRpcResources>(config.getConfigId(), 0, 1); + cc_service = std::make_unique<ClusterControllerApiRpcService>(enqueuer, *shared_rpc_resources); + shared_rpc_resources->start_server_and_register_slobrok("my_cool_rpc_test"); } virtual ~FixtureBase() { @@ -99,22 +104,22 @@ struct SetStateFixture : FixtureBase { void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) { create_request(bundle); - fnet_listener->RPC_setDistributionStates(bound_request); + cc_service->RPC_setDistributionStates(bound_request); assert_enqueued_operation_has_bundle(bundle); } void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) { - fnet_listener->RPC_setDistributionStates(bound_request); + cc_service->RPC_setDistributionStates(bound_request); ASSERT_FALSE(request_is_detached); ASSERT_TRUE(bound_request->IsError()); ASSERT_EQ(static_cast<uint32_t>(error_code), bound_request->GetErrorCode()); } - lib::ClusterStateBundle dummy_baseline_bundle() const { + static lib::ClusterStateBundle dummy_baseline_bundle() { return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); } - lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const { + static lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) { return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred); } }; @@ -134,14 +139,14 @@ vespalib::string make_compressable_state_string() { } // anon namespace -TEST_F(FNetListenerTest, baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle) { +TEST_F(ClusterControllerApiRpcServiceTest, baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle) { SetStateFixture f; auto baseline = f.dummy_baseline_bundle(); f.assert_request_received_and_propagated(baseline); } -TEST_F(FNetListenerTest, set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle) { +TEST_F(ClusterControllerApiRpcServiceTest, set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle) { SetStateFixture f; lib::ClusterStateBundle spaces_bundle( lib::ClusterState("version:123 distributor:3 storage:3"), @@ -151,7 +156,7 @@ TEST_F(FNetListenerTest, set_distribution_states_rpc_with_derived_enqueues_comma f.assert_request_received_and_propagated(spaces_bundle); } -TEST_F(FNetListenerTest, compressed_bundle_is_transparently_uncompressed) { +TEST_F(ClusterControllerApiRpcServiceTest, compressed_bundle_is_transparently_uncompressed) { SetStateFixture f; auto state_str = make_compressable_state_string(); lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; @@ -160,38 +165,38 @@ TEST_F(FNetListenerTest, compressed_bundle_is_transparently_uncompressed) { // First verify that the bundle is sent in compressed form ASSERT_LT(f.bound_request->GetParams()->GetValue(2)._data._len, state_str.size()); // Ensure we uncompress it to the original form - f.fnet_listener->RPC_setDistributionStates(f.bound_request); + f.cc_service->RPC_setDistributionStates(f.bound_request); f.assert_enqueued_operation_has_bundle(compressable_bundle); } -TEST_F(FNetListenerTest, set_distribution_rpc_is_immediately_failed_if_listener_is_closed) { +TEST_F(ClusterControllerApiRpcServiceTest, set_distribution_rpc_is_immediately_failed_if_listener_is_closed) { SetStateFixture f; f.create_request(f.dummy_baseline_bundle()); - f.fnet_listener->close(); + f.cc_service->close(); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); } -TEST_F(FNetListenerTest, overly_large_uncompressed_bundle_size_parameter_returns_rpc_error) { +TEST_F(ClusterControllerApiRpcServiceTest, overly_large_uncompressed_bundle_size_parameter_returns_rpc_error) { SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); - f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1); + f.bind_request_params(encoded_bundle, ClusterControllerApiRpcService::StateBundleMaxUncompressedSize + 1); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } -TEST_F(FNetListenerTest, mismatching_uncompressed_bundle_size_parameter_returns_rpc_error) { +TEST_F(ClusterControllerApiRpcServiceTest, mismatching_uncompressed_bundle_size_parameter_returns_rpc_error) { SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } -TEST_F(FNetListenerTest, true_deferred_activation_flag_can_be_roundtrip_encoded) { +TEST_F(ClusterControllerApiRpcServiceTest, true_deferred_activation_flag_can_be_roundtrip_encoded) { SetStateFixture f; f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true)); } -TEST_F(FNetListenerTest, false_deferred_activation_flag_can_be_roundtrip_encoded) { +TEST_F(ClusterControllerApiRpcServiceTest, false_deferred_activation_flag_can_be_roundtrip_encoded) { SetStateFixture f; f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false)); } @@ -224,12 +229,12 @@ struct ActivateStateFixture : FixtureBase { void assert_request_received_and_propagated(uint32_t activate_version) { create_request(activate_version); - fnet_listener->RPC_activateClusterStateVersion(bound_request); + cc_service->RPC_activateClusterStateVersion(bound_request); assert_enqueued_operation_has_activate_version(activate_version); } }; -TEST_F(FNetListenerTest, activate_cluster_state_version_rpc_enqueues_command_with_version) { +TEST_F(ClusterControllerApiRpcServiceTest, activate_cluster_state_version_rpc_enqueues_command_with_version) { ActivateStateFixture f; f.assert_request_received_and_propagated(1234567); } diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 3e4b1fd6515..29f58087ffa 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -55,3 +55,7 @@ mbus.skip_request_thread bool default=false ## Skip communication manager thread on mbus requests ## Experimental skip_thread bool default=false + +## Whether to use direct P2P RPC protocol for all StorageAPI communication +## instead of going via MessageBus. +use_direct_storageapi_rpc bool default=false diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 606d61ab944..538bff5b9f0 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -1,4 +1,5 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + vespa_add_library(storage_storageserver SOURCES bouncer.cpp @@ -12,7 +13,6 @@ vespa_add_library(storage_storageserver distributornodecontext.cpp documentapiconverter.cpp fnet_metrics_wrapper.cpp - fnetlistener.cpp mergethrottler.cpp messagesink.cpp opslogger.cpp @@ -21,13 +21,13 @@ vespa_add_library(storage_storageserver service_layer_error_listener.cpp servicelayernode.cpp servicelayernodecontext.cpp - slime_cluster_state_bundle_codec.cpp statemanager.cpp statereporter.cpp storagemetricsset.cpp storagenode.cpp storagenodecontext.cpp tls_statistics_metrics_wrapper.cpp + $<TARGET_OBJECTS:storage_storageserver_rpc> INSTALL lib64 DEPENDS storage diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index b51394e2e64..283c487f03d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -1,8 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanager.h" -#include "fnetlistener.h" +#include "vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h" +#include "vespa/storage/storageserver/rpc/storage_api_rpc_service.h" #include "rpcrequestwrapper.h" +#include "vespa/storage/storageserver/rpc/shared_rpc_resources.h" #include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/network/rpcnetworkparams.h> @@ -20,6 +22,7 @@ #include <vespa/log/bufferedlogger.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/documentapi/messagebus/messages/getdocumentreply.h> +#include <string_view> LOG_SETUP(".communication.manager"); @@ -248,14 +251,17 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co : StorageLink("Communication manager"), _component(compReg, "communicationmanager"), _metrics(_component.getLoadTypes()->getMetricLoadTypes()), - _listener(), + _shared_rpc_resources(), // Created upon initial configuration + _storage_api_rpc_service(), // (ditto) + _cc_rpc_service(), // (ditto) _eventQueue(), _mbus(), _configUri(configUri), _closed(false), _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), _thread(), - _skip_thread(false) + _skip_thread(false), + _use_direct_storageapi_rpc(false) { _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); _component.registerMetric(_metrics); @@ -270,8 +276,8 @@ CommunicationManager::onOpen() framework::MilliSecTime maxProcessingTime(60 * 1000); _thread = _component.startThread(*this, maxProcessingTime); - if (_listener) { - _listener->registerHandle(_component.getIdentity()); + if (_shared_rpc_resources) { + _shared_rpc_resources->start_server_and_register_slobrok(_component.getIdentity()); } } @@ -313,8 +319,13 @@ void CommunicationManager::onClose() } } - if (_listener) { - _listener->close(); + // TODO remove? this no longer has any particularly useful semantics + if (_cc_rpc_service) { + _cc_rpc_service->close(); + } + // TODO do this after we drain queues? + if (_shared_rpc_resources) { + _shared_rpc_resources->shutdown(); } // Stopping pumper thread should stop all incoming messages from being @@ -327,6 +338,7 @@ void CommunicationManager::onClose() } // Emptying remaining queued messages + // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this std::shared_ptr<api::StorageMessage> msg; api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down"); while (_eventQueue.size() > 0) { @@ -362,9 +374,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } - if (_listener->getListenPort() != config->rpcport) { + if (_shared_rpc_resources->listen_port() != config->rpcport) { auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.", - _listener->getListenPort(), config->rpcport); + _shared_rpc_resources->listen_port(), config->rpcport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } @@ -407,7 +419,15 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> configureMessageBusLimits(*config); } - _listener = std::make_unique<FNetListener>(*this, _configUri, config->rpcport); + // TODO temporary! + auto repo_getter = [this]{ return _component.getTypeRepo()->documentTypeRepo; }; + auto loadtype_getter = [this]{ return _component.getLoadTypes(); }; + + _use_direct_storageapi_rpc = config->useDirectStorageapiRpc; + // TODO configurable thread pool size + _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, 1/*pool size*/); + _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); + _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(*this, *_shared_rpc_resources, std::move(repo_getter), std::move(loadtype_getter)); if (_mbus) { mbus::DestinationSessionParams dstParams; @@ -542,15 +562,18 @@ CommunicationManager::sendCommand( switch (address.getProtocol()) { case api::StorageMessageAddress::STORAGE: { - LOG(spam, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); - - auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); + LOG(info, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); + if (_use_direct_storageapi_rpc) { + _storage_api_rpc_service->send_rpc_v1_request(msg); + } else { + auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); - cmd->setContext(mbus::Context(msg->getMsgId())); - cmd->setRetryEnabled(address.retryEnabled()); - cmd->setTimeRemaining(msg->getTimeout()); - cmd->setTrace(msg->getTrace()); - sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); + cmd->setContext(mbus::Context(msg->getMsgId())); + cmd->setRetryEnabled(address.retryEnabled()); + cmd->setTimeRemaining(msg->getTimeout()); + cmd->setTrace(msg->getTrace()); + sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); + } break; } case api::StorageMessageAddress::DOCUMENT: @@ -601,8 +624,11 @@ CommunicationManager::sendDirectRPCReply( RPCRequestWrapper& request, const std::shared_ptr<api::StorageReply>& reply) { - std::string requestName(request.getMethodName()); - if (requestName == "getnodestate3") { + std::string_view requestName(request.getMethodName()); // TODO non-name based dispatch + // TODO rework this entire dispatch mechanism :D + if (requestName == "storageapi.v1.send") { + _storage_api_rpc_service->encode_rpc_v1_response(*request.raw_request(), *reply); + } else if (requestName == "getnodestate3") { auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 23b59f5a42a..6e419ea6f19 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -37,19 +37,23 @@ namespace mbus { } namespace storage { +namespace rpc { +class ClusterControllerApiRpcService; +class SharedRpcResources; +class StorageApiRpcService; +} + struct BucketResolver; -class VisitorMbusSession; class Visitor; class VisitorThread; -class FNetListener; class RPCRequestWrapper; class StorageTransportContext : public api::TransportContext { public: - StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg); - StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg); - StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request); - ~StorageTransportContext(); + explicit StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg); + explicit StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg); + explicit StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request); + ~StorageTransportContext() override; std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg; std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg; @@ -72,7 +76,9 @@ private: StorageComponent _component; CommunicationManagerMetrics _metrics; - std::unique_ptr<FNetListener> _listener; + std::unique_ptr<rpc::SharedRpcResources> _shared_rpc_resources; + std::unique_ptr<rpc::StorageApiRpcService> _storage_api_rpc_service; + std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service; Queue _eventQueue; // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? std::unique_ptr<config::ConfigFetcher> _configFetcher; @@ -112,6 +118,7 @@ private: DocumentApiConverter _docApiConverter; framework::Thread::UP _thread; bool _skip_thread; + bool _use_direct_storageapi_rpc; void updateMetrics(const MetricLockGuard &) override; void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg); diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h deleted file mode 100644 index e37727beb44..00000000000 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/slobrok/sbregister.h> -#include <atomic> - -class FNET_Transport; -class FastOS_ThreadPool; - -namespace storage { - -namespace api { class StorageMessage; } - -class MessageEnqueuer; - -class FNetListener : public FRT_Invokable -{ -public: - static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16; - - FNetListener(MessageEnqueuer& messageEnqueuer, - const config::ConfigUri & configUri, - uint32_t port); - ~FNetListener() override; - - void initRPC(); - void RPC_getNodeState2(FRT_RPCRequest *req); - void RPC_setSystemState2(FRT_RPCRequest *req); - void RPC_getCurrentTime(FRT_RPCRequest *req); - void RPC_setDistributionStates(FRT_RPCRequest* req); - void RPC_activateClusterStateVersion(FRT_RPCRequest* req); - - void registerHandle(vespalib::stringref handle); - void close(); - int getListenPort() const; - -private: - MessageEnqueuer& _messageEnqueuer; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - std::atomic<bool> _closed; - slobrok::api::RegisterAPI _slobrokRegister; - vespalib::string _handle; - - void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req); -}; - -} diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h index 921328a054b..aaa988940d3 100644 --- a/storage/src/vespa/storage/storageserver/message_enqueuer.h +++ b/storage/src/vespa/storage/storageserver/message_enqueuer.h @@ -11,6 +11,7 @@ namespace api { class StorageMessage; } class MessageEnqueuer { public: virtual ~MessageEnqueuer() = default; + // TODO separate into explicit direct dispatch and threaded enqueue virtual void enqueue(std::shared_ptr<api::StorageMessage> msg) = 0; }; diff --git a/storage/src/vespa/storage/storageserver/rpc/.gitignore b/storage/src/vespa/storage/storageserver/rpc/.gitignore new file mode 100644 index 00000000000..3221e7351af --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/.gitignore @@ -0,0 +1,2 @@ +*.pb.h +*.pb.cc
\ No newline at end of file diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt new file mode 100644 index 00000000000..f42656068f6 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +find_package(Protobuf REQUIRED) +PROTOBUF_GENERATE_CPP(storage_storageserver_rpc_PROTOBUF_SRCS storage_storageserver_rpc_PROTOBUF_HDRS + protobuf/rpc_envelope.proto +) + +vespa_add_source_target(protobufgen_storage_storageserver_rpc DEPENDS + ${storage_storageserver_rpc_PROTOBUF_SRCS} + ${storage_storageserver_rpc_PROTOBUF_HDRS}) + +vespa_suppress_warnings_for_protobuf_sources(SOURCES ${storage_storageserver_rpc_PROTOBUF_SRCS}) + +vespa_add_library(storage_storageserver_rpc OBJECT + SOURCES + caching_rpc_target_resolver.cpp + cluster_controller_api_rpc_service.cpp + rpc_target.cpp + shared_rpc_resources.cpp + slime_cluster_state_bundle_codec.cpp + storage_api_rpc_service.cpp + ${storage_storageserver_rpc_PROTOBUF_SRCS} + DEPENDS +)
\ No newline at end of file diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp new file mode 100644 index 00000000000..7ccd01e2ccc --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp @@ -0,0 +1,82 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "caching_rpc_target_resolver.h" +#include "shared_rpc_resources.h" +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/stllike/hash_map.hpp> + +#include <vespa/log/log.h> +LOG_SETUP(".storage.caching_rpc_target_resolver"); + +namespace storage::rpc { + +CachingRpcTargetResolver::CachingRpcTargetResolver(SharedRpcResources& rpc_resources) + : _rpc_resources(rpc_resources) +{ +} + +CachingRpcTargetResolver::~CachingRpcTargetResolver() = default; + +namespace { + +vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address) { + vespalib::asciistream as; + as << "storage/cluster." << address.getCluster() + << '/' << ((address.getNodeType() == lib::NodeType::STORAGE) ? "storage" : "distributor") + << '/' << address.getIndex(); + return as.str(); +} + +} + +// TODO ensure this is robust and performant wrt. visitor clients constantly bumping +// slobrok generations by registering new sessions all the time. +std::shared_ptr<RpcTarget> +CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) { + // TODO or map directly from address to target instead of going via stringification? Needs hashing, if so. + auto sb_id = address_to_slobrok_id(address); + const uint32_t current_sb_gen = _rpc_resources.slobrok_mirror().updates(); + { + std::shared_lock lock(_targets_rwmutex); + auto target_iter = _targets.find(sb_id); + if ((target_iter != _targets.end()) + && target_iter->second->_target->IsValid() + && (target_iter->second->_sb_generation == current_sb_gen)) + { + return target_iter->second; + } + } + auto specs = _rpc_resources.slobrok_mirror().lookup(sb_id); // FIXME string type mismatch; implicit conv! + if (specs.empty()) { + LOG(info, "Found no mapping for %s", sb_id.c_str()); + // TODO return potentially stale existing target if no longer existing in SB? + // TODO or clear any existing mapping? + return {}; + } + const auto& candidate_spec = specs[0].second; // Always use first spec in list. TODO correct? + std::unique_lock lock(_targets_rwmutex); + // If address has the same spec as the existing target, just reuse it. + auto target_iter = _targets.find(sb_id); + if ((target_iter != _targets.end()) + && (target_iter->second->_target->IsValid()) + && (target_iter->second->_spec == candidate_spec)) + { + LOG(info, "Updating existing mapping %s -> %s (gen %u) to gen %u", + sb_id.c_str(), candidate_spec.c_str(), target_iter->second->_sb_generation, current_sb_gen); + target_iter->second->_sb_generation = current_sb_gen; + return target_iter->second; + } + // Insert new mapping or update the old one. + auto* raw_target = _rpc_resources.supervisor().GetTarget(candidate_spec.c_str()); // TODO expensive inside lock? + assert(raw_target); + auto rpc_target = std::make_shared<RpcTarget>(raw_target, candidate_spec, current_sb_gen); + _targets[sb_id] = rpc_target; + LOG(info, "Added mapping %s -> %s at gen %u", sb_id.c_str(), candidate_spec.c_str(), current_sb_gen); + return rpc_target; +} + + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h new file mode 100644 index 00000000000..ac91019806b --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h @@ -0,0 +1,32 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "rpc_target.h" +#include <vespa/vespalib/stllike/hash_map.h> +#include <memory> +#include <shared_mutex> + +namespace storage { + +namespace api { class StorageMessageAddress; } + +namespace rpc { + +class SharedRpcResources; + +class CachingRpcTargetResolver { + SharedRpcResources& _rpc_resources; + mutable std::shared_mutex _targets_rwmutex; + // TODO LRU? Size cap? + vespalib::hash_map<vespalib::string, std::shared_ptr<RpcTarget>> _targets; +public: + // TODO pass explicit slobrok mirror interface and supervisor to make testing easier + // TODO consider wrapping supervisor to make testing easier + explicit CachingRpcTargetResolver(SharedRpcResources& rpc_resources); + ~CachingRpcTargetResolver(); + + std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address); +}; + +} // rpc +} // storage diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp index c5d7880d966..09a42887a5d 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp @@ -1,119 +1,81 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "fnetlistener.h" -#include "communicationmanager.h" -#include "rpcrequestwrapper.h" +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "cluster_controller_api_rpc_service.h" +#include "shared_rpc_resources.h" #include "slime_cluster_state_bundle_codec.h" +#include <vespa/storage/storageserver/communicationmanager.h> +#include <vespa/storage/storageserver/message_enqueuer.h> +#include <vespa/storage/storageserver/rpcrequestwrapper.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/host_name.h> -#include <vespa/vespalib/util/time.h> -#include <vespa/fnet/frt/supervisor.h> -#include <vespa/fnet/transport.h> -#include <sstream> -#include <thread> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/log.h> -LOG_SETUP(".rpc.listener"); - -namespace storage { - -FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port) - : _messageEnqueuer(messageEnqueuer), - _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), - _transport(std::make_unique<FNET_Transport>()), - _orb(std::make_unique<FRT_Supervisor>(_transport.get())), - _closed(false), - _slobrokRegister(*_orb, configUri) -{ - initRPC(); - if (!_orb->Listen(port)) { - std::ostringstream ost; - ost << "Failed to listen to RPC port " << port << "."; - throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); - } - _transport->Start(_threadPool.get()); -} +LOG_SETUP(".storage.cluster_controller_api_rpc_service"); -FNetListener::~FNetListener() -{ - if (!_closed) { - close(); - } -} +namespace storage::rpc { -int -FNetListener::getListenPort() const +ClusterControllerApiRpcService::ClusterControllerApiRpcService( + MessageEnqueuer& message_enqueuer, + SharedRpcResources& rpc_resources) + : _message_enqueuer(message_enqueuer), + _rpc_resources(rpc_resources), + _closed(false) { - return _orb->GetListenPort(); + register_server_methods(rpc_resources); } -void -FNetListener::registerHandle(vespalib::stringref handle) { - _slobrokRegister.registerName(handle); - while (_slobrokRegister.busy()) { - LOG(debug, "Waiting to register in slobrok"); - std::this_thread::sleep_for(50ms); - } - _handle = handle; -} +ClusterControllerApiRpcService::~ClusterControllerApiRpcService() = default; -void -FNetListener::close() -{ - _closed = true; - _slobrokRegister.unregisterName(_handle); - _transport->ShutDown(true); +void ClusterControllerApiRpcService::close() { + _closed.store(true); } -void -FNetListener::initRPC() -{ - FRT_ReflectionBuilder rb(_orb.get()); +void ClusterControllerApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) { + FRT_ReflectionBuilder rb(&rpc_resources.supervisor()); - rb.DefineMethod("getnodestate3", "sii", "ss", FRT_METHOD(FNetListener::RPC_getNodeState2), this); + rb.DefineMethod("getnodestate3", "sii", "ss", FRT_METHOD(ClusterControllerApiRpcService::RPC_getNodeState2), this); rb.MethodDesc("Get state of this node"); rb.ParamDesc("nodestate", "Expected state of given node. If correct, the " - "request will be queued on target until it changes. To not give " - "any state use the string 'unknown', enforcing a direct reply."); + "request will be queued on target until it changes. To not give " + "any state use the string 'unknown', enforcing a direct reply."); rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the state requester"); rb.ReturnDesc("nodestate", "State string for this node"); rb.ReturnDesc("hostinfo", "Information about host this node is running on"); //------------------------------------------------------------------------- - rb.DefineMethod("getnodestate2", "si", "s", FRT_METHOD(FNetListener::RPC_getNodeState2), this); + rb.DefineMethod("getnodestate2", "si", "s", FRT_METHOD(ClusterControllerApiRpcService::RPC_getNodeState2), this); rb.MethodDesc("Get state of this node"); rb.ParamDesc("nodestate", "Expected state of given node. If correct, the " - "request will be queued on target until it changes. To not give " - "any state use the string 'unknown', enforcing a direct reply."); + "request will be queued on target until it changes. To not give " + "any state use the string 'unknown', enforcing a direct reply."); rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the state requester"); rb.ReturnDesc("nodestate", "State string for this node"); //------------------------------------------------------------------------- - rb.DefineMethod("setsystemstate2", "s", "", FRT_METHOD(FNetListener::RPC_setSystemState2), this); + rb.DefineMethod("setsystemstate2", "s", "", FRT_METHOD(ClusterControllerApiRpcService::RPC_setSystemState2), this); rb.MethodDesc("Set systemstate on this node"); rb.ParamDesc("systemstate", "New systemstate to set"); //------------------------------------------------------------------------- - rb.DefineMethod("setdistributionstates", "bix", "", FRT_METHOD(FNetListener::RPC_setDistributionStates), this); + rb.DefineMethod("setdistributionstates", "bix", "", FRT_METHOD(ClusterControllerApiRpcService::RPC_setDistributionStates), this); rb.MethodDesc("Set distribution states for cluster and bucket spaces"); rb.ParamDesc("compressionType", "Compression type for payload"); rb.ParamDesc("uncompressedSize", "Uncompressed size for payload"); rb.ParamDesc("payload", "Binary Slime format payload"); //------------------------------------------------------------------------- - rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this); + rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(ClusterControllerApiRpcService::RPC_activateClusterStateVersion), this); rb.MethodDesc("Explicitly activates an already prepared cluster state version"); rb.ParamDesc("activate_version", "Expected cluster state version to activate"); rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC"); //------------------------------------------------------------------------- - rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this); + rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(ClusterControllerApiRpcService::RPC_getCurrentTime), this); rb.MethodDesc("Get current time on this node"); rb.ReturnDesc("seconds", "Current time in seconds since epoch"); rb.ReturnDesc("nanoseconds", "additional nanoseconds since epoch"); rb.ReturnDesc("hostname", "Host name"); - //------------------------------------------------------------------------- } - -void -FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req) -{ +// TODO remove? is this used by anyone? +void ClusterControllerApiRpcService::RPC_getCurrentTime(FRT_RPCRequest* req) { if (_closed) { LOG(debug, "Not handling RPC call getCurrentTime() as we have closed"); req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); @@ -127,19 +89,19 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req) vespalib::string hostname = vespalib::HostName::get(); req->GetReturn()->AddString(hostname.c_str()); // all handled, will return immediately - return; } -void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req) { +void ClusterControllerApiRpcService::detach_and_forward_to_enqueuer( + std::shared_ptr<api::StorageMessage> cmd, + FRT_RPCRequest* req) +{ // Create a request object to avoid needing a separate transport type cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req))); req->Detach(); - _messageEnqueuer.enqueue(std::move(cmd)); + _message_enqueuer.enqueue(std::move(cmd)); } -void -FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) -{ +void ClusterControllerApiRpcService::RPC_getNodeState2(FRT_RPCRequest* req) { if (_closed) { LOG(debug, "Not handling RPC call getNodeState2() as we have closed"); req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); @@ -147,11 +109,11 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) } vespalib::string expected(req->GetParams()->GetValue(0)._string._str, - req->GetParams()->GetValue(0)._string._len); + req->GetParams()->GetValue(0)._string._len); - auto cmd(std::make_shared<api::GetNodeStateCommand>(expected != "unknown" - ? std::make_unique<lib::NodeState>(expected) - : std::unique_ptr<lib::NodeState>())); + auto cmd = std::make_shared<api::GetNodeStateCommand>(expected != "unknown" + ? std::make_unique<lib::NodeState>(expected) + : std::unique_ptr<lib::NodeState>()); cmd->setPriority(api::StorageMessage::VERYHIGH); cmd->setTimeout(std::chrono::milliseconds(req->GetParams()->GetValue(1)._intval32)); @@ -161,9 +123,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) detach_and_forward_to_enqueuer(std::move(cmd), req); } -void -FNetListener::RPC_setSystemState2(FRT_RPCRequest *req) -{ +void ClusterControllerApiRpcService::RPC_setSystemState2(FRT_RPCRequest* req) { if (_closed) { LOG(debug, "Not handling RPC call setSystemState2() as we have closed"); req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); @@ -173,7 +133,7 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req) req->GetParams()->GetValue(0)._string._len); lib::ClusterState systemState(systemStateStr); - auto cmd(std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState))); + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)); cmd->setPriority(api::StorageMessage::VERYHIGH); detach_and_forward_to_enqueuer(std::move(cmd), req); @@ -183,10 +143,10 @@ namespace { std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const FRT_Values& params) { const uint32_t uncompressed_length = params[1]._intval32; - if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) { + if (uncompressed_length > ClusterControllerApiRpcService::StateBundleMaxUncompressedSize) { throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is " "greater than max size (%u)", uncompressed_length, - FNetListener::StateBundleMaxUncompressedSize)); + ClusterControllerApiRpcService::StateBundleMaxUncompressedSize)); } SlimeClusterStateBundleCodec codec; EncodedClusterStateBundle encoded_bundle; @@ -200,7 +160,7 @@ std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const F } -void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { +void ClusterControllerApiRpcService::RPC_setDistributionStates(FRT_RPCRequest* req) { if (_closed) { LOG(debug, "Not handling RPC call setDistributionStates() as we have closed"); req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); @@ -223,7 +183,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { detach_and_forward_to_enqueuer(std::move(cmd), req); } -void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) { +void ClusterControllerApiRpcService::RPC_activateClusterStateVersion(FRT_RPCRequest* req) { if (_closed) { LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed"); req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h new file mode 100644 index 00000000000..837f00fad5e --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h @@ -0,0 +1,50 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fnet/frt/invokable.h> +#include <atomic> +#include <memory> + +class FRT_RPCRequest; + +namespace storage { + +class MessageEnqueuer; + +namespace api { +class StorageCommand; +class StorageMessage; +class StorageMessageAddress; +class StorageReply; +} + +namespace rpc { + +class SharedRpcResources; + +class ClusterControllerApiRpcService : public FRT_Invokable { + MessageEnqueuer& _message_enqueuer; + SharedRpcResources& _rpc_resources; + std::atomic<bool> _closed; +public: + static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16; + + ClusterControllerApiRpcService(MessageEnqueuer& message_enqueuer, + SharedRpcResources& rpc_resources); + ~ClusterControllerApiRpcService() override; + + void close(); + + void RPC_getNodeState2(FRT_RPCRequest* req); + void RPC_setSystemState2(FRT_RPCRequest* req); + void RPC_getCurrentTime(FRT_RPCRequest* req); + void RPC_setDistributionStates(FRT_RPCRequest* req); + void RPC_activateClusterStateVersion(FRT_RPCRequest* req); +private: + void register_server_methods(SharedRpcResources&); + // TODO factor out as shared functionality + void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req); +}; + +} // rpc +} // storage diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h index 41c5db9876d..ea6ef2649d0 100644 --- a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h @@ -4,9 +4,9 @@ #include "encoded_cluster_state_bundle.h" -namespace storage { +namespace storage::lib { class ClusterStateBundle; } -namespace lib { class ClusterStateBundle; } +namespace storage::rpc { /** * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC. diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h index 6f25a6b67a6..92e66aab378 100644 --- a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h +++ b/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h @@ -5,7 +5,7 @@ #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/util/compressor.h> -namespace storage { +namespace storage::rpc { /** * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle. diff --git a/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto b/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto new file mode 100644 index 00000000000..2eab6068bad --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto @@ -0,0 +1,15 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.protobuf; + +message RequestHeader { + uint64 time_remaining_ms = 1; + uint32 trace_level = 2; +} + +message ResponseHeader { + bytes trace_payload = 1; +} diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp new file mode 100644 index 00000000000..ddf881d3bd3 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp @@ -0,0 +1,17 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "rpc_target.h" +#include <vespa/fnet/frt/target.h> + +namespace storage::rpc { + +RpcTarget::RpcTarget(FRT_Target* target, vespalib::stringref spec, uint32_t sb_generation) + : _target(target), + _spec(spec), + _sb_generation(sb_generation) +{} + +RpcTarget::~RpcTarget() { + _target->SubRef(); +} + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h new file mode 100644 index 00000000000..7fd1ddeafaf --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h @@ -0,0 +1,27 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <cstdint> + +class FRT_Target; + +namespace storage::rpc { + +struct RpcTarget { + FRT_Target* _target; + const vespalib::string _spec; + uint32_t _sb_generation; + + // Target must have ref count of at least 1 + RpcTarget(FRT_Target* target, + vespalib::stringref spec, + uint32_t sb_generation); + RpcTarget(const RpcTarget&) = delete; + RpcTarget& operator=(const RpcTarget&) = delete; + RpcTarget(RpcTarget&&) = delete; + RpcTarget& operator=(RpcTarget&&) = delete; + ~RpcTarget(); +}; + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp new file mode 100644 index 00000000000..4ce1732c6f8 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -0,0 +1,74 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "shared_rpc_resources.h" +#include <vespa/fastos/thread.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/transport.h> +#include <vespa/slobrok/sbregister.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <cassert> +#include <chrono> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP(".storage.shared_rpc_resources"); + +using namespace std::chrono_literals; + +namespace storage::rpc { + +SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, + int rpc_server_port, + size_t rpc_thread_pool_size) + : _thread_pool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>(rpc_thread_pool_size)), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), + _slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, config_uri)), + _slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config_uri)), + _rpc_server_port(rpc_server_port), + _shutdown(false) +{ +} + +// TODO make sure init/shutdown is safe for aborted init in comm. mgr. + +SharedRpcResources::~SharedRpcResources() { + if (!_shutdown) { + shutdown(); + } +} + +void SharedRpcResources::start_server_and_register_slobrok(vespalib::stringref my_handle) { + LOG(debug, "Starting main RPC supervisor on port %d", _rpc_server_port); + if (!_orb->Listen(_rpc_server_port)) { + throw vespalib::IllegalStateException(vespalib::make_string("Failed to listen to RPC port %d", _rpc_server_port), + VESPA_STRLOC); + } + _transport->Start(_thread_pool.get()); + _slobrok_register->registerName(my_handle); + wait_until_slobrok_is_ready(); + _handle = my_handle; +} + +void SharedRpcResources::wait_until_slobrok_is_ready() { + // TODO look more closely at how mbus does its slobrok business + while (_slobrok_register->busy() || !_slobrok_mirror->ready()) { + // TODO some form of timeout mechanism here, and warning logging to identify SB issues + LOG(debug, "Waiting for Slobrok to become ready"); + std::this_thread::sleep_for(50ms); + } +} + +void SharedRpcResources::shutdown() { + assert(!_shutdown); + _slobrok_register->unregisterName(_handle); + _transport->ShutDown(true); + _shutdown = true; +} + +int SharedRpcResources::listen_port() const noexcept { + return _orb->GetListenPort(); +} + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h new file mode 100644 index 00000000000..9b2f1c04249 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -0,0 +1,49 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/config/subscription/configuri.h> +#include <vespa/vespalib/stllike/string.h> +#include <memory> + +class FastOS_ThreadPool; +class FNET_Transport; +class FRT_Supervisor; + +namespace slobrok::api { +class RegisterAPI; +class MirrorAPI; +} + +namespace storage::rpc { + +class SharedRpcResources { + std::unique_ptr<FastOS_ThreadPool> _thread_pool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register; + std::unique_ptr<slobrok::api::MirrorAPI> _slobrok_mirror; + vespalib::string _handle; + int _rpc_server_port; + bool _shutdown; +public: + SharedRpcResources(const config::ConfigUri& config_uri, int rpc_server_port, size_t rpc_thread_pool_size); + ~SharedRpcResources(); + + FRT_Supervisor& supervisor() noexcept { return *_orb; } + const FRT_Supervisor& supervisor() const noexcept { return *_orb; } + + slobrok::api::RegisterAPI& slobrok_register() noexcept { return *_slobrok_register; } + const slobrok::api::RegisterAPI& slobrok_register() const noexcept { return *_slobrok_register; } + slobrok::api::MirrorAPI& slobrok_mirror() noexcept { return *_slobrok_mirror; } + const slobrok::api::MirrorAPI& slobrok_mirror() const noexcept { return *_slobrok_mirror; } + // To be called after all RPC handlers have been registered. + void start_server_and_register_slobrok(vespalib::stringref my_handle); + + void shutdown(); + [[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started +private: + void wait_until_slobrok_is_ready(); +}; + + +} diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp index 1f854bc724e..0e8a3081aa2 100644 --- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp @@ -18,7 +18,7 @@ using vespalib::compression::compress; using vespalib::Memory; using namespace vespalib::slime; -namespace storage { +namespace storage::rpc { // TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp namespace { diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h index 1fb95134059..0c25de5faa5 100644 --- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h +++ b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h @@ -5,7 +5,7 @@ #include "cluster_state_bundle_codec.h" #include <memory> -namespace storage { +namespace storage::rpc { /** * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp new file mode 100644 index 00000000000..4c911f092a3 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -0,0 +1,302 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "storage_api_rpc_service.h" +#include "caching_rpc_target_resolver.h" +#include "shared_rpc_resources.h" +#include "rpc_envelope.pb.h" +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/slobrok/sbmirror.h> +#include <vespa/storage/storageserver/communicationmanager.h> +#include <vespa/storage/storageserver/rpcrequestwrapper.h> +#include <vespa/storageapi/mbusprot/protocolserialization7.h> +#include <vespa/storageapi/messageapi/storagecommand.h> +#include <vespa/vespalib/data/databuffer.h> +#include <vespa/vespalib/util/compressor.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".storage.storage_api_rpc_service"); + +namespace storage::rpc { + +StorageApiRpcService::StorageApiRpcService(MessageEnqueuer& messageEnqueuer, + SharedRpcResources& rpc_resources, + std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func, + std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func) + : _message_enqueuer(messageEnqueuer), + _rpc_resources(rpc_resources), + // TODO these are temporary, need to be consolidated and moved out + _doctype_repo_func(std::move(doctype_repo_func)), + _loadtype_set_func(std::move(loadtype_set_func)), + _target_resolver(std::make_unique<CachingRpcTargetResolver>(rpc_resources)) +{ + register_server_methods(rpc_resources); +} + +StorageApiRpcService::~StorageApiRpcService() = default; + +void StorageApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) { + FRT_ReflectionBuilder rb(&rpc_resources.supervisor()); + rb.DefineMethod("storageapi.v1.send", "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this); + rb.MethodDesc("V1 of StorageAPI direct RPC protocol"); + rb.ParamDesc("header_encoding", "0=raw, 6=lz4"); + rb.ParamDesc("header_decoded_size", "Uncompressed header blob size"); + rb.ParamDesc("header_payload", "The message header blob"); + rb.ParamDesc("body_encoding", "0=raw, 6=lz4"); + rb.ParamDesc("body_decoded_size", "Uncompressed body blob size"); + rb.ParamDesc("body_payload", "The message body blob"); + rb.ReturnDesc("header_encoding", "0=raw, 6=lz4"); + rb.ReturnDesc("header_decoded_size", "Uncompressed header blob size"); + rb.ReturnDesc("header_payload", "The reply header blob"); + rb.ReturnDesc("body_encoding", "0=raw, 6=lz4"); + rb.ReturnDesc("body_decoded_size", "Uncompressed body blob size"); + rb.ReturnDesc("body_payload", "The reply body blob"); +} + +void StorageApiRpcService::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req) { + // Create a request object to avoid needing a separate transport type + cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req))); + req->Detach(); + _message_enqueuer.enqueue(std::move(cmd)); +} + +namespace { + +struct SubRefDeleter { + template <typename T> + void operator()(T* v) const noexcept { + v->SubRef(); + } +}; + +template <typename HeaderType> +bool decode_header_from_rpc_params(const FRT_Values& params, HeaderType& hdr) { + const auto compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8); + const uint32_t uncompressed_length = params[1]._intval32; + + if (compression_type == vespalib::compression::CompressionConfig::NONE) { + // Fast-path in the common case where request header is not compressed. + return hdr.ParseFromArray(params[2]._data._buf, params[2]._data._len); + } else { + vespalib::DataBuffer uncompressed(params[2]._data._buf, params[2]._data._len); + vespalib::ConstBufferRef blob(params[2]._data._buf, params[2]._data._len); + decompress(compression_type, uncompressed_length, blob, uncompressed, true); + assert(uncompressed_length == uncompressed.getDataLen()); + return hdr.ParseFromArray(uncompressed.getData(), uncompressed.getDataLen()); + } +} + +// Must be done prior to adding payload +template <typename HeaderType> +void encode_header_into_rpc_params(HeaderType& hdr, FRT_Values& params) { + params.AddInt8(vespalib::compression::CompressionConfig::Type::NONE); // TODO when needed + const auto header_size = hdr.ByteSizeLong(); + assert(header_size <= UINT32_MAX); + params.AddInt32(static_cast<uint32_t>(header_size)); + auto* header_buf = reinterpret_cast<uint8_t*>(params.AddData(header_size)); + hdr.SerializeWithCachedSizesToArray(header_buf); +} + +void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, FRT_Values& params) { + assert(payload.size() <= UINT32_MAX); + vespalib::ConstBufferRef to_compress(payload.data(), payload.size()); + vespalib::DataBuffer buf(vespalib::roundUp2inN(payload.size())); + // TODO configurable compression config? + vespalib::compression::CompressionConfig comp_cfg(vespalib::compression::CompressionConfig::Type::LZ4); + auto comp_type = compress(comp_cfg, to_compress, buf, false); + assert(buf.getDataLen() <= UINT32_MAX); + + params.AddInt8(comp_type); + params.AddInt32(static_cast<uint32_t>(to_compress.size())); + params.AddData(buf.stealBuffer(), buf.getDataLen()); +} + +} // anon ns + +template <typename MessageType> +void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params) { + // Shared ptrs must stay alive + // TODO move out, cache + auto repo = _doctype_repo_func(); + auto load_types = _loadtype_set_func(); + mbusprot::ProtocolSerialization7 codec(repo, *load_types); + auto payload = codec.encode(msg); + + compress_and_add_payload_to_rpc_params(payload, params); +} + +template <typename PayloadCodecCallback> +void StorageApiRpcService::uncompress_rpc_payload( + const FRT_Values& params, + PayloadCodecCallback payload_callback) +{ + const auto compression_type = vespalib::compression::CompressionConfig::toType(params[3]._intval8); + const uint32_t uncompressed_length = params[4]._intval32; + // TODO fast path if uncompressed? + vespalib::DataBuffer uncompressed(params[5]._data._buf, params[5]._data._len); + vespalib::ConstBufferRef blob(params[5]._data._buf, params[5]._data._len); + decompress(compression_type, uncompressed_length, blob, uncompressed, true); + assert(uncompressed_length == uncompressed.getDataLen()); + assert(uncompressed_length <= UINT32_MAX); + + // Shared ptrs must stay alive + // TODO move out, cache + auto repo = _doctype_repo_func(); + auto loadtypes = _loadtype_set_func(); + mbusprot::ProtocolSerialization7 codec(repo, *loadtypes); + + payload_callback(codec, mbus::BlobRef(uncompressed.getData(), uncompressed_length)); +} + +void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { + LOG(info, "Server: received rpc.v1 request"); + // TODO do we need to manually check the parameter/return spec here? + const auto& params = *req->GetParams(); + protobuf::RequestHeader hdr; + if (!decode_header_from_rpc_params(params, hdr)) { + req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request header protobuf"); + return; + } + std::unique_ptr<mbusprot::StorageCommand> cmd; + uint32_t uncompressed_size = 0; + uncompress_rpc_payload(params, [&cmd, &uncompressed_size](auto& codec, auto payload) { + cmd = codec.decodeCommand(payload); + uncompressed_size = static_cast<uint32_t>(payload.size()); + }); + if (cmd && cmd->has_command()) { + auto scmd = cmd->steal_command(); + scmd->setApproxByteSize(uncompressed_size); + scmd->getTrace().setLevel(hdr.trace_level()); + scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms())); + req->DiscardBlobs(); + detach_and_forward_to_enqueuer(std::move(scmd), req); + } else { + req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request payload"); + } +} + +void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) { + LOG(info, "Server: encoding rpc.v1 response header and payload"); + auto* ret = request.GetReturn(); + + // TODO skip encoding header altogether if no relevant fields set? + protobuf::ResponseHeader hdr; + if (reply.getTrace().getLevel() > 0) { + hdr.set_trace_payload(reply.getTrace().getRoot().encode()); + } + // TODO consistent naming... + encode_header_into_rpc_params(hdr, *ret); + encode_and_compress_rpc_payload<api::StorageReply>(reply, *ret); +} + +void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd) { + LOG(info, "Client: sending rpc.v1 request for message of type %s", cmd->getType().getName().c_str()); + + assert(cmd->getAddress() != nullptr); + auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress()); + if (!target) { + // TODO need to bounce reply with TODO descriptive error code, mirroring that of RPCNetwork! + auto reply = cmd->makeReply(); + reply->setResult(api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE), + "Couldn't find the darn thing in Slobrok")); // TODO :D + // TODO always enforce separate thread for this, or we risk nuking the + // stack if the reply receiver keeps resending immediately and synchronously. + _message_enqueuer.enqueue(std::move(reply)); // TODO dispatch_async_in_separate_thread(std::move(reply))? + return; + } + std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest()); + req->SetMethodName("storageapi.v1.send"); + + protobuf::RequestHeader req_hdr; + req_hdr.set_time_remaining_ms(std::chrono::duration_cast<std::chrono::milliseconds>(cmd->getTimeout()).count()); + req_hdr.set_trace_level(cmd->getTrace().getLevel()); + + auto* params = req->GetParams(); + encode_header_into_rpc_params(req_hdr, *params); + encode_and_compress_rpc_payload<api::StorageCommand>(*cmd, *params); + + const auto timeout = cmd->getTimeout(); + // TODO verify it's fine that we alloc this on the request stash and use it this way + auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd), timeout); + req->SetContext(FNET_Context(&req_ctx)); + + target->_target->InvokeAsync(req.release(), vespalib::to_s(timeout), this); +} + +namespace { + +api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, vespalib::duration timeout) { + // TODO determine all codes that must be (re)mapped. Current remapping adapted from RPCSend + // TODO need to keep enough state to give peer ID/spec in error messages + switch (req.GetErrorCode()) { + case FRTE_RPC_TIMEOUT: + return api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::TIMEOUT), + vespalib::make_string("A timeout occurred while waiting for '%s' (%g seconds expired); %s", + "TODO", vespalib::to_s(timeout), req.GetErrorMessage())); + case FRTE_RPC_CONNECTION: + return api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::CONNECTION_ERROR), + vespalib::make_string("A connection error occurred for '%s'; %s", + "TODO", req.GetErrorMessage())); + default: + return api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NETWORK_ERROR), + vespalib::make_string("A network error occurred for '%s'; %s", + "TODO", req.GetErrorMessage())); + } +} + +} + +void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { + std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req); + auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP); + if (!req->CheckReturnTypes("bixbix")) { + api::ReturnCode error = map_frt_error_to_storage_api_error(*req, req_ctx->_timeout); + LOG(info, "Client: received rpc.v1 error response: %s", error.toString().c_str()); + auto error_reply = req_ctx->_originator_cmd->makeReply(); + error_reply->setResult(std::move(error)); + // TODO needs tracing of received-event! + _message_enqueuer.enqueue(std::move(error_reply)); + return; + } + LOG(info, "Client: received rpc.v1 OK response"); + + const auto& ret = *req->GetReturn(); + protobuf::ResponseHeader hdr; + if (!decode_header_from_rpc_params(ret, hdr)) { + assert(false); // TODO generate error reply + return; + } + std::unique_ptr<mbusprot::StorageReply> wrapped_reply; + uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) { + wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd); + }); + // TODO the reply wrapper does lazy deserialization. Can we/should we ever defer? + auto reply = wrapped_reply->getInternalMessage(); // TODO message stealing + assert(reply); + + if (!hdr.trace_payload().empty()) { + req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + } + reply->getTrace().swap(req_ctx->_originator_cmd->getTrace()); + + // TODO ensure that no implicit long-lived refs end up pointing into RPC memory...! + req->DiscardBlobs(); + _message_enqueuer.enqueue(std::move(reply)); +} + +/* + * Major TODOs: + * - tracing and trace propagation + * - forwards/backwards compatibility + * - lifetime semantics of FRT targets vs requests created from them? + * - lifetime of document type/fieldset repos vs messages + * - is repo ref squirreled away into the messages anywhere? + * - everything else! :3 + */ + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h new file mode 100644 index 00000000000..414c1d4e93c --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -0,0 +1,75 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "rpc_target.h" +#include <vespa/fnet/frt/invokable.h> +#include <vespa/fnet/frt/invoker.h> +#include <vespa/vespalib/stllike/string.h> +#include <atomic> +#include <functional> +#include <memory> + +class FRT_RPCRequest; +class FRT_Target; + +namespace document { class DocumentTypeRepo; } +namespace documentapi { class LoadTypeSet; } + +namespace storage { + +class MessageEnqueuer; + +namespace api { +class StorageCommand; +class StorageMessage; +class StorageMessageAddress; +class StorageReply; +} + +namespace rpc { + +class SharedRpcResources; +class CachingRpcTargetResolver; + +class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait { + MessageEnqueuer& _message_enqueuer; + SharedRpcResources& _rpc_resources; +public: + StorageApiRpcService(MessageEnqueuer& messageEnqueuer, + SharedRpcResources& rpc_resources, + // TODO temporary! + std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func, + std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func); + ~StorageApiRpcService() override; + + void RPC_rpc_v1_send(FRT_RPCRequest* req); + void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply); + void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd); +private: + // TODO dedupe + void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req); + + struct RpcRequestContext { + std::shared_ptr<api::StorageCommand> _originator_cmd; + std::chrono::nanoseconds _timeout; + + RpcRequestContext(std::shared_ptr<api::StorageCommand> cmd, std::chrono::nanoseconds timeout) + : _originator_cmd(std::move(cmd)), + _timeout(timeout) + {} + }; + + std::function<std::shared_ptr<const document::DocumentTypeRepo>()> _doctype_repo_func; + std::function<std::shared_ptr<documentapi::LoadTypeSet>()> _loadtype_set_func; + std::unique_ptr<CachingRpcTargetResolver> _target_resolver; + + void register_server_methods(SharedRpcResources&); + template <typename PayloadCodecCallback> + void uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback); + template <typename MessageType> + void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params); + void RequestDone(FRT_RPCRequest* request) override; +}; + +} // rpc +} // storage diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp index 5f3ff86fe8c..2a083a2d704 100644 --- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp +++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp @@ -7,13 +7,13 @@ namespace storage { RPCRequestWrapper::RPCRequestWrapper(FRT_RPCRequest *req) - : _req(req) + : _req(req) { } RPCRequestWrapper::~RPCRequestWrapper() { - if (_req != 0) { + if (_req) { _req->SetError(ERR_REQUEST_DELETED, "Request deleted without having been replied to"); _req->Return(); } @@ -22,7 +22,7 @@ RPCRequestWrapper::~RPCRequestWrapper() const char * RPCRequestWrapper::getParam() const { - assert(_req != 0); + assert(_req); return _req->GetParams()->GetValue(0)._data._buf; } @@ -30,7 +30,7 @@ RPCRequestWrapper::getParam() const uint32_t RPCRequestWrapper::getParamLen() const { - assert(_req != 0); + assert(_req); return _req->GetParams()->GetValue(0)._data._len; } @@ -38,26 +38,26 @@ RPCRequestWrapper::getParamLen() const void RPCRequestWrapper::returnData(const char *pt, uint32_t len) { - assert(_req != 0); + assert(_req); _req->GetReturn()->AddData(pt, len); _req->Return(); - _req = 0; + _req = nullptr; } void RPCRequestWrapper::returnError(uint32_t errorCode, const char *errorMessage) { - assert(_req != 0); + assert(_req); _req->SetError(errorCode, errorMessage); _req->Return(); - _req = 0; + _req = nullptr; } void RPCRequestWrapper::addReturnString(const char *str, uint32_t len) { - assert(_req != 0); + assert(_req); if (len !=0) { _req->GetReturn()->AddString(str, len); } else { @@ -68,16 +68,16 @@ RPCRequestWrapper::addReturnString(const char *str, uint32_t len) void RPCRequestWrapper::addReturnInt(uint32_t value) { - assert(_req != 0); + assert(_req); _req->GetReturn()->AddInt32(value); } void RPCRequestWrapper::returnRequest() { - assert(_req != 0); + assert(_req); _req->Return(); - _req = 0; + _req = nullptr; } @@ -89,7 +89,7 @@ RPCRequestWrapper::getMethodName() const { void RPCRequestWrapper::discardBlobs() { - if (_req != 0) { + if (_req) { _req->DiscardBlobs(); } } diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h index 8412e911601..e07163613ea 100644 --- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h +++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h @@ -50,6 +50,8 @@ public: **/ void returnError(uint32_t errorCode, const char *errorMessage); + FRT_RPCRequest* raw_request() noexcept { return _req; } + const char *getMethodName() const; void addReturnString(const char *str, uint32_t len=0); void addReturnInt(uint32_t value); @@ -66,7 +68,7 @@ private: RPCRequestWrapper(const RPCRequestWrapper &); RPCRequestWrapper &operator=(const RPCRequestWrapper &); - FRT_RPCRequest *_req; // underlying RPC request + FRT_RPCRequest* _req; // underlying RPC request }; } // namespace storage |