summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-08 13:42:12 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 13:22:14 +0000
commit54150406f3c490463bc5371c9480452da168bd5c (patch)
tree73721c84df79dd649c967a5fe9f0525431476af6 /storage
parentea22ec7cb6e35cf591d08ffe898388a7f08593cc (diff)
Basic functionality for direct RPC for StorageAPI communication
This has several advantages: * Completely bypasses all MessageBus indirections * Explicit setup of RPC thread pool * Direct dispatch from RPC thread to persistence queue pool * Better control of encoding/decoding and buffer usage
Diffstat (limited to 'storage')
-rw-r--r--storage/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt2
-rw-r--r--storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp (renamed from storage/src/tests/storageserver/fnet_listener_test.cpp)55
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def4
-rw-r--r--storage/src/vespa/storage/storageserver/CMakeLists.txt4
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp66
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h21
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h49
-rw-r--r--storage/src/vespa/storage/storageserver/message_enqueuer.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/.gitignore2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt24
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp82
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h32
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp (renamed from storage/src/vespa/storage/storageserver/fnetlistener.cpp)142
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h50
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h (renamed from storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h)4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h (renamed from storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h)2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto15
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp17
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.h27
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp74
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h49
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp (renamed from storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp)2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h (renamed from storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h)2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp302
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h75
-rw-r--r--storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp26
-rw-r--r--storage/src/vespa/storage/storageserver/rpcrequestwrapper.h4
28 files changed, 920 insertions, 214 deletions
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index dc0a0194d36..cbfd4f1606d 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -39,6 +39,7 @@ vespa_define_module(
src/vespa/storage/persistence
src/vespa/storage/persistence/filestorage
src/vespa/storage/storageserver
+ src/vespa/storage/storageserver/rpc
src/vespa/storage/storageutil
src/vespa/storage/tools
src/vespa/storage/visiting
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
index e49a69414b7..680779f27cb 100644
--- a/storage/src/tests/storageserver/CMakeLists.txt
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -14,7 +14,7 @@ vespa_add_executable(storage_storageserver_gtest_runner_app TEST
communicationmanagertest.cpp
configurable_bucket_resolver_test.cpp
documentapiconvertertest.cpp
- fnet_listener_test.cpp
+ cluster_controller_rpc_api_service_test.cpp
mergethrottlertest.cpp
priorityconvertertest.cpp
service_layer_error_listener_test.cpp
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp
index f82af1f8e5c..7510e0579ea 100644
--- a/storage/src/tests/storageserver/fnet_listener_test.cpp
+++ b/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp
@@ -1,25 +1,27 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/storageserver/fnetlistener.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/storage/storageserver/message_enqueuer.h>
#include <vespa/storage/storageserver/rpcrequestwrapper.h>
-#include <vespa/storage/storageserver/slime_cluster_state_bundle_codec.h>
+#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vdstestlib/config/dirconfig.hpp>
-#include <vespa/messagebus/testlib/slobrok.h>
#include <tests/common/testhelper.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vector>
-namespace storage {
+namespace storage::rpc {
using document::FixedBucketSpaces;
using namespace ::testing;
-struct FNetListenerTest : Test {
+struct ClusterControllerApiRpcServiceTest : Test {
};
namespace {
@@ -38,11 +40,11 @@ struct DummyReturnHandler : FRT_IReturnHandler {
};
struct FixtureBase {
- // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
mbus::Slobrok slobrok;
vdstestlib::DirConfig config;
MockOperationEnqueuer enqueuer;
- std::unique_ptr<FNetListener> fnet_listener;
+ std::unique_ptr<SharedRpcResources> shared_rpc_resources;
+ std::unique_ptr<ClusterControllerApiRpcService> cc_service;
DummyReturnHandler return_handler;
bool request_is_detached{false};
FRT_RPCRequest* bound_request{nullptr};
@@ -52,7 +54,10 @@ struct FixtureBase {
{
config.getConfig("stor-server").set("node_index", "1");
addSlobrokConfig(config, slobrok);
- fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
+
+ shared_rpc_resources = std::make_unique<SharedRpcResources>(config.getConfigId(), 0, 1);
+ cc_service = std::make_unique<ClusterControllerApiRpcService>(enqueuer, *shared_rpc_resources);
+ shared_rpc_resources->start_server_and_register_slobrok("my_cool_rpc_test");
}
virtual ~FixtureBase() {
@@ -99,22 +104,22 @@ struct SetStateFixture : FixtureBase {
void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) {
create_request(bundle);
- fnet_listener->RPC_setDistributionStates(bound_request);
+ cc_service->RPC_setDistributionStates(bound_request);
assert_enqueued_operation_has_bundle(bundle);
}
void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) {
- fnet_listener->RPC_setDistributionStates(bound_request);
+ cc_service->RPC_setDistributionStates(bound_request);
ASSERT_FALSE(request_is_detached);
ASSERT_TRUE(bound_request->IsError());
ASSERT_EQ(static_cast<uint32_t>(error_code), bound_request->GetErrorCode());
}
- lib::ClusterStateBundle dummy_baseline_bundle() const {
+ static lib::ClusterStateBundle dummy_baseline_bundle() {
return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
}
- lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const {
+ static lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) {
return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred);
}
};
@@ -134,14 +139,14 @@ vespalib::string make_compressable_state_string() {
} // anon namespace
-TEST_F(FNetListenerTest, baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle) {
+TEST_F(ClusterControllerApiRpcServiceTest, baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle) {
SetStateFixture f;
auto baseline = f.dummy_baseline_bundle();
f.assert_request_received_and_propagated(baseline);
}
-TEST_F(FNetListenerTest, set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle) {
+TEST_F(ClusterControllerApiRpcServiceTest, set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle) {
SetStateFixture f;
lib::ClusterStateBundle spaces_bundle(
lib::ClusterState("version:123 distributor:3 storage:3"),
@@ -151,7 +156,7 @@ TEST_F(FNetListenerTest, set_distribution_states_rpc_with_derived_enqueues_comma
f.assert_request_received_and_propagated(spaces_bundle);
}
-TEST_F(FNetListenerTest, compressed_bundle_is_transparently_uncompressed) {
+TEST_F(ClusterControllerApiRpcServiceTest, compressed_bundle_is_transparently_uncompressed) {
SetStateFixture f;
auto state_str = make_compressable_state_string();
lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
@@ -160,38 +165,38 @@ TEST_F(FNetListenerTest, compressed_bundle_is_transparently_uncompressed) {
// First verify that the bundle is sent in compressed form
ASSERT_LT(f.bound_request->GetParams()->GetValue(2)._data._len, state_str.size());
// Ensure we uncompress it to the original form
- f.fnet_listener->RPC_setDistributionStates(f.bound_request);
+ f.cc_service->RPC_setDistributionStates(f.bound_request);
f.assert_enqueued_operation_has_bundle(compressable_bundle);
}
-TEST_F(FNetListenerTest, set_distribution_rpc_is_immediately_failed_if_listener_is_closed) {
+TEST_F(ClusterControllerApiRpcServiceTest, set_distribution_rpc_is_immediately_failed_if_listener_is_closed) {
SetStateFixture f;
f.create_request(f.dummy_baseline_bundle());
- f.fnet_listener->close();
+ f.cc_service->close();
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
}
-TEST_F(FNetListenerTest, overly_large_uncompressed_bundle_size_parameter_returns_rpc_error) {
+TEST_F(ClusterControllerApiRpcServiceTest, overly_large_uncompressed_bundle_size_parameter_returns_rpc_error) {
SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
- f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
+ f.bind_request_params(encoded_bundle, ClusterControllerApiRpcService::StateBundleMaxUncompressedSize + 1);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
-TEST_F(FNetListenerTest, mismatching_uncompressed_bundle_size_parameter_returns_rpc_error) {
+TEST_F(ClusterControllerApiRpcServiceTest, mismatching_uncompressed_bundle_size_parameter_returns_rpc_error) {
SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
-TEST_F(FNetListenerTest, true_deferred_activation_flag_can_be_roundtrip_encoded) {
+TEST_F(ClusterControllerApiRpcServiceTest, true_deferred_activation_flag_can_be_roundtrip_encoded) {
SetStateFixture f;
f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true));
}
-TEST_F(FNetListenerTest, false_deferred_activation_flag_can_be_roundtrip_encoded) {
+TEST_F(ClusterControllerApiRpcServiceTest, false_deferred_activation_flag_can_be_roundtrip_encoded) {
SetStateFixture f;
f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false));
}
@@ -224,12 +229,12 @@ struct ActivateStateFixture : FixtureBase {
void assert_request_received_and_propagated(uint32_t activate_version) {
create_request(activate_version);
- fnet_listener->RPC_activateClusterStateVersion(bound_request);
+ cc_service->RPC_activateClusterStateVersion(bound_request);
assert_enqueued_operation_has_activate_version(activate_version);
}
};
-TEST_F(FNetListenerTest, activate_cluster_state_version_rpc_enqueues_command_with_version) {
+TEST_F(ClusterControllerApiRpcServiceTest, activate_cluster_state_version_rpc_enqueues_command_with_version) {
ActivateStateFixture f;
f.assert_request_received_and_propagated(1234567);
}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 3e4b1fd6515..29f58087ffa 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -55,3 +55,7 @@ mbus.skip_request_thread bool default=false
## Skip communication manager thread on mbus requests
## Experimental
skip_thread bool default=false
+
+## Whether to use direct P2P RPC protocol for all StorageAPI communication
+## instead of going via MessageBus.
+use_direct_storageapi_rpc bool default=false
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt
index 606d61ab944..538bff5b9f0 100644
--- a/storage/src/vespa/storage/storageserver/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt
@@ -1,4 +1,5 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
vespa_add_library(storage_storageserver
SOURCES
bouncer.cpp
@@ -12,7 +13,6 @@ vespa_add_library(storage_storageserver
distributornodecontext.cpp
documentapiconverter.cpp
fnet_metrics_wrapper.cpp
- fnetlistener.cpp
mergethrottler.cpp
messagesink.cpp
opslogger.cpp
@@ -21,13 +21,13 @@ vespa_add_library(storage_storageserver
service_layer_error_listener.cpp
servicelayernode.cpp
servicelayernodecontext.cpp
- slime_cluster_state_bundle_codec.cpp
statemanager.cpp
statereporter.cpp
storagemetricsset.cpp
storagenode.cpp
storagenodecontext.cpp
tls_statistics_metrics_wrapper.cpp
+ $<TARGET_OBJECTS:storage_storageserver_rpc>
INSTALL lib64
DEPENDS
storage
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index b51394e2e64..283c487f03d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -1,8 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "communicationmanager.h"
-#include "fnetlistener.h"
+#include "vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h"
+#include "vespa/storage/storageserver/rpc/storage_api_rpc_service.h"
#include "rpcrequestwrapper.h"
+#include "vespa/storage/storageserver/rpc/shared_rpc_resources.h"
#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/network/rpcnetworkparams.h>
@@ -20,6 +22,7 @@
#include <vespa/log/bufferedlogger.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
+#include <string_view>
LOG_SETUP(".communication.manager");
@@ -248,14 +251,17 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
: StorageLink("Communication manager"),
_component(compReg, "communicationmanager"),
_metrics(_component.getLoadTypes()->getMetricLoadTypes()),
- _listener(),
+ _shared_rpc_resources(), // Created upon initial configuration
+ _storage_api_rpc_service(), // (ditto)
+ _cc_rpc_service(), // (ditto)
_eventQueue(),
_mbus(),
_configUri(configUri),
_closed(false),
_docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()),
_thread(),
- _skip_thread(false)
+ _skip_thread(false),
+ _use_direct_storageapi_rpc(false)
{
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
_component.registerMetric(_metrics);
@@ -270,8 +276,8 @@ CommunicationManager::onOpen()
framework::MilliSecTime maxProcessingTime(60 * 1000);
_thread = _component.startThread(*this, maxProcessingTime);
- if (_listener) {
- _listener->registerHandle(_component.getIdentity());
+ if (_shared_rpc_resources) {
+ _shared_rpc_resources->start_server_and_register_slobrok(_component.getIdentity());
}
}
@@ -313,8 +319,13 @@ void CommunicationManager::onClose()
}
}
- if (_listener) {
- _listener->close();
+ // TODO remove? this no longer has any particularly useful semantics
+ if (_cc_rpc_service) {
+ _cc_rpc_service->close();
+ }
+ // TODO do this after we drain queues?
+ if (_shared_rpc_resources) {
+ _shared_rpc_resources->shutdown();
}
// Stopping pumper thread should stop all incoming messages from being
@@ -327,6 +338,7 @@ void CommunicationManager::onClose()
}
// Emptying remaining queued messages
+ // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this
std::shared_ptr<api::StorageMessage> msg;
api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down");
while (_eventQueue.size() > 0) {
@@ -362,9 +374,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
LOG(warning, "%s", m.c_str());
_component.requestShutdown(m);
}
- if (_listener->getListenPort() != config->rpcport) {
+ if (_shared_rpc_resources->listen_port() != config->rpcport) {
auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.",
- _listener->getListenPort(), config->rpcport);
+ _shared_rpc_resources->listen_port(), config->rpcport);
LOG(warning, "%s", m.c_str());
_component.requestShutdown(m);
}
@@ -407,7 +419,15 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
configureMessageBusLimits(*config);
}
- _listener = std::make_unique<FNetListener>(*this, _configUri, config->rpcport);
+ // TODO temporary!
+ auto repo_getter = [this]{ return _component.getTypeRepo()->documentTypeRepo; };
+ auto loadtype_getter = [this]{ return _component.getLoadTypes(); };
+
+ _use_direct_storageapi_rpc = config->useDirectStorageapiRpc;
+ // TODO configurable thread pool size
+ _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, 1/*pool size*/);
+ _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
+ _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(*this, *_shared_rpc_resources, std::move(repo_getter), std::move(loadtype_getter));
if (_mbus) {
mbus::DestinationSessionParams dstParams;
@@ -542,15 +562,18 @@ CommunicationManager::sendCommand(
switch (address.getProtocol()) {
case api::StorageMessageAddress::STORAGE:
{
- LOG(spam, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str());
-
- auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
+ LOG(info, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str());
+ if (_use_direct_storageapi_rpc) {
+ _storage_api_rpc_service->send_rpc_v1_request(msg);
+ } else {
+ auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
- cmd->setContext(mbus::Context(msg->getMsgId()));
- cmd->setRetryEnabled(address.retryEnabled());
- cmd->setTimeRemaining(msg->getTimeout());
- cmd->setTrace(msg->getTrace());
- sendMessageBusMessage(msg, std::move(cmd), address.getRoute());
+ cmd->setContext(mbus::Context(msg->getMsgId()));
+ cmd->setRetryEnabled(address.retryEnabled());
+ cmd->setTimeRemaining(msg->getTimeout());
+ cmd->setTrace(msg->getTrace());
+ sendMessageBusMessage(msg, std::move(cmd), address.getRoute());
+ }
break;
}
case api::StorageMessageAddress::DOCUMENT:
@@ -601,8 +624,11 @@ CommunicationManager::sendDirectRPCReply(
RPCRequestWrapper& request,
const std::shared_ptr<api::StorageReply>& reply)
{
- std::string requestName(request.getMethodName());
- if (requestName == "getnodestate3") {
+ std::string_view requestName(request.getMethodName()); // TODO non-name based dispatch
+ // TODO rework this entire dispatch mechanism :D
+ if (requestName == "storageapi.v1.send") {
+ _storage_api_rpc_service->encode_rpc_v1_response(*request.raw_request(), *reply);
+ } else if (requestName == "getnodestate3") {
auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
std::ostringstream ns;
serializeNodeState(gns, ns, true, true, false);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 23b59f5a42a..6e419ea6f19 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -37,19 +37,23 @@ namespace mbus {
}
namespace storage {
+namespace rpc {
+class ClusterControllerApiRpcService;
+class SharedRpcResources;
+class StorageApiRpcService;
+}
+
struct BucketResolver;
-class VisitorMbusSession;
class Visitor;
class VisitorThread;
-class FNetListener;
class RPCRequestWrapper;
class StorageTransportContext : public api::TransportContext {
public:
- StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
- StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg);
- StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request);
- ~StorageTransportContext();
+ explicit StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
+ explicit StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg);
+ explicit StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request);
+ ~StorageTransportContext() override;
std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg;
std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg;
@@ -72,7 +76,9 @@ private:
StorageComponent _component;
CommunicationManagerMetrics _metrics;
- std::unique_ptr<FNetListener> _listener;
+ std::unique_ptr<rpc::SharedRpcResources> _shared_rpc_resources;
+ std::unique_ptr<rpc::StorageApiRpcService> _storage_api_rpc_service;
+ std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service;
Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
@@ -112,6 +118,7 @@ private:
DocumentApiConverter _docApiConverter;
framework::Thread::UP _thread;
bool _skip_thread;
+ bool _use_direct_storageapi_rpc;
void updateMetrics(const MetricLockGuard &) override;
void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg);
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
deleted file mode 100644
index e37727beb44..00000000000
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/slobrok/sbregister.h>
-#include <atomic>
-
-class FNET_Transport;
-class FastOS_ThreadPool;
-
-namespace storage {
-
-namespace api { class StorageMessage; }
-
-class MessageEnqueuer;
-
-class FNetListener : public FRT_Invokable
-{
-public:
- static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16;
-
- FNetListener(MessageEnqueuer& messageEnqueuer,
- const config::ConfigUri & configUri,
- uint32_t port);
- ~FNetListener() override;
-
- void initRPC();
- void RPC_getNodeState2(FRT_RPCRequest *req);
- void RPC_setSystemState2(FRT_RPCRequest *req);
- void RPC_getCurrentTime(FRT_RPCRequest *req);
- void RPC_setDistributionStates(FRT_RPCRequest* req);
- void RPC_activateClusterStateVersion(FRT_RPCRequest* req);
-
- void registerHandle(vespalib::stringref handle);
- void close();
- int getListenPort() const;
-
-private:
- MessageEnqueuer& _messageEnqueuer;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- std::atomic<bool> _closed;
- slobrok::api::RegisterAPI _slobrokRegister;
- vespalib::string _handle;
-
- void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req);
-};
-
-}
diff --git a/storage/src/vespa/storage/storageserver/message_enqueuer.h b/storage/src/vespa/storage/storageserver/message_enqueuer.h
index 921328a054b..aaa988940d3 100644
--- a/storage/src/vespa/storage/storageserver/message_enqueuer.h
+++ b/storage/src/vespa/storage/storageserver/message_enqueuer.h
@@ -11,6 +11,7 @@ namespace api { class StorageMessage; }
class MessageEnqueuer {
public:
virtual ~MessageEnqueuer() = default;
+ // TODO separate into explicit direct dispatch and threaded enqueue
virtual void enqueue(std::shared_ptr<api::StorageMessage> msg) = 0;
};
diff --git a/storage/src/vespa/storage/storageserver/rpc/.gitignore b/storage/src/vespa/storage/storageserver/rpc/.gitignore
new file mode 100644
index 00000000000..3221e7351af
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/.gitignore
@@ -0,0 +1,2 @@
+*.pb.h
+*.pb.cc \ No newline at end of file
diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
new file mode 100644
index 00000000000..f42656068f6
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+find_package(Protobuf REQUIRED)
+PROTOBUF_GENERATE_CPP(storage_storageserver_rpc_PROTOBUF_SRCS storage_storageserver_rpc_PROTOBUF_HDRS
+ protobuf/rpc_envelope.proto
+)
+
+vespa_add_source_target(protobufgen_storage_storageserver_rpc DEPENDS
+ ${storage_storageserver_rpc_PROTOBUF_SRCS}
+ ${storage_storageserver_rpc_PROTOBUF_HDRS})
+
+vespa_suppress_warnings_for_protobuf_sources(SOURCES ${storage_storageserver_rpc_PROTOBUF_SRCS})
+
+vespa_add_library(storage_storageserver_rpc OBJECT
+ SOURCES
+ caching_rpc_target_resolver.cpp
+ cluster_controller_api_rpc_service.cpp
+ rpc_target.cpp
+ shared_rpc_resources.cpp
+ slime_cluster_state_bundle_codec.cpp
+ storage_api_rpc_service.cpp
+ ${storage_storageserver_rpc_PROTOBUF_SRCS}
+ DEPENDS
+) \ No newline at end of file
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
new file mode 100644
index 00000000000..7ccd01e2ccc
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
@@ -0,0 +1,82 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "caching_rpc_target_resolver.h"
+#include "shared_rpc_resources.h"
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.caching_rpc_target_resolver");
+
+namespace storage::rpc {
+
+CachingRpcTargetResolver::CachingRpcTargetResolver(SharedRpcResources& rpc_resources)
+ : _rpc_resources(rpc_resources)
+{
+}
+
+CachingRpcTargetResolver::~CachingRpcTargetResolver() = default;
+
+namespace {
+
+vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address) {
+ vespalib::asciistream as;
+ as << "storage/cluster." << address.getCluster()
+ << '/' << ((address.getNodeType() == lib::NodeType::STORAGE) ? "storage" : "distributor")
+ << '/' << address.getIndex();
+ return as.str();
+}
+
+}
+
+// TODO ensure this is robust and performant wrt. visitor clients constantly bumping
+// slobrok generations by registering new sessions all the time.
+std::shared_ptr<RpcTarget>
+CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) {
+ // TODO or map directly from address to target instead of going via stringification? Needs hashing, if so.
+ auto sb_id = address_to_slobrok_id(address);
+ const uint32_t current_sb_gen = _rpc_resources.slobrok_mirror().updates();
+ {
+ std::shared_lock lock(_targets_rwmutex);
+ auto target_iter = _targets.find(sb_id);
+ if ((target_iter != _targets.end())
+ && target_iter->second->_target->IsValid()
+ && (target_iter->second->_sb_generation == current_sb_gen))
+ {
+ return target_iter->second;
+ }
+ }
+ auto specs = _rpc_resources.slobrok_mirror().lookup(sb_id); // FIXME string type mismatch; implicit conv!
+ if (specs.empty()) {
+ LOG(info, "Found no mapping for %s", sb_id.c_str());
+ // TODO return potentially stale existing target if no longer existing in SB?
+ // TODO or clear any existing mapping?
+ return {};
+ }
+ const auto& candidate_spec = specs[0].second; // Always use first spec in list. TODO correct?
+ std::unique_lock lock(_targets_rwmutex);
+ // If address has the same spec as the existing target, just reuse it.
+ auto target_iter = _targets.find(sb_id);
+ if ((target_iter != _targets.end())
+ && (target_iter->second->_target->IsValid())
+ && (target_iter->second->_spec == candidate_spec))
+ {
+ LOG(info, "Updating existing mapping %s -> %s (gen %u) to gen %u",
+ sb_id.c_str(), candidate_spec.c_str(), target_iter->second->_sb_generation, current_sb_gen);
+ target_iter->second->_sb_generation = current_sb_gen;
+ return target_iter->second;
+ }
+ // Insert new mapping or update the old one.
+ auto* raw_target = _rpc_resources.supervisor().GetTarget(candidate_spec.c_str()); // TODO expensive inside lock?
+ assert(raw_target);
+ auto rpc_target = std::make_shared<RpcTarget>(raw_target, candidate_spec, current_sb_gen);
+ _targets[sb_id] = rpc_target;
+ LOG(info, "Added mapping %s -> %s at gen %u", sb_id.c_str(), candidate_spec.c_str(), current_sb_gen);
+ return rpc_target;
+}
+
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
new file mode 100644
index 00000000000..ac91019806b
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
@@ -0,0 +1,32 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "rpc_target.h"
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <memory>
+#include <shared_mutex>
+
+namespace storage {
+
+namespace api { class StorageMessageAddress; }
+
+namespace rpc {
+
+class SharedRpcResources;
+
+class CachingRpcTargetResolver {
+ SharedRpcResources& _rpc_resources;
+ mutable std::shared_mutex _targets_rwmutex;
+ // TODO LRU? Size cap?
+ vespalib::hash_map<vespalib::string, std::shared_ptr<RpcTarget>> _targets;
+public:
+ // TODO pass explicit slobrok mirror interface and supervisor to make testing easier
+ // TODO consider wrapping supervisor to make testing easier
+ explicit CachingRpcTargetResolver(SharedRpcResources& rpc_resources);
+ ~CachingRpcTargetResolver();
+
+ std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address);
+};
+
+} // rpc
+} // storage
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
index c5d7880d966..09a42887a5d 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
@@ -1,119 +1,81 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "fnetlistener.h"
-#include "communicationmanager.h"
-#include "rpcrequestwrapper.h"
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cluster_controller_api_rpc_service.h"
+#include "shared_rpc_resources.h"
#include "slime_cluster_state_bundle_codec.h"
+#include <vespa/storage/storageserver/communicationmanager.h>
+#include <vespa/storage/storageserver/message_enqueuer.h>
+#include <vespa/storage/storageserver/rpcrequestwrapper.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
-#include <vespa/vespalib/util/time.h>
-#include <vespa/fnet/frt/supervisor.h>
-#include <vespa/fnet/transport.h>
-#include <sstream>
-#include <thread>
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/log.h>
-LOG_SETUP(".rpc.listener");
-
-namespace storage {
-
-FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port)
- : _messageEnqueuer(messageEnqueuer),
- _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
- _transport(std::make_unique<FNET_Transport>()),
- _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
- _closed(false),
- _slobrokRegister(*_orb, configUri)
-{
- initRPC();
- if (!_orb->Listen(port)) {
- std::ostringstream ost;
- ost << "Failed to listen to RPC port " << port << ".";
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- _transport->Start(_threadPool.get());
-}
+LOG_SETUP(".storage.cluster_controller_api_rpc_service");
-FNetListener::~FNetListener()
-{
- if (!_closed) {
- close();
- }
-}
+namespace storage::rpc {
-int
-FNetListener::getListenPort() const
+ClusterControllerApiRpcService::ClusterControllerApiRpcService(
+ MessageEnqueuer& message_enqueuer,
+ SharedRpcResources& rpc_resources)
+ : _message_enqueuer(message_enqueuer),
+ _rpc_resources(rpc_resources),
+ _closed(false)
{
- return _orb->GetListenPort();
+ register_server_methods(rpc_resources);
}
-void
-FNetListener::registerHandle(vespalib::stringref handle) {
- _slobrokRegister.registerName(handle);
- while (_slobrokRegister.busy()) {
- LOG(debug, "Waiting to register in slobrok");
- std::this_thread::sleep_for(50ms);
- }
- _handle = handle;
-}
+ClusterControllerApiRpcService::~ClusterControllerApiRpcService() = default;
-void
-FNetListener::close()
-{
- _closed = true;
- _slobrokRegister.unregisterName(_handle);
- _transport->ShutDown(true);
+void ClusterControllerApiRpcService::close() {
+ _closed.store(true);
}
-void
-FNetListener::initRPC()
-{
- FRT_ReflectionBuilder rb(_orb.get());
+void ClusterControllerApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) {
+ FRT_ReflectionBuilder rb(&rpc_resources.supervisor());
- rb.DefineMethod("getnodestate3", "sii", "ss", FRT_METHOD(FNetListener::RPC_getNodeState2), this);
+ rb.DefineMethod("getnodestate3", "sii", "ss", FRT_METHOD(ClusterControllerApiRpcService::RPC_getNodeState2), this);
rb.MethodDesc("Get state of this node");
rb.ParamDesc("nodestate", "Expected state of given node. If correct, the "
- "request will be queued on target until it changes. To not give "
- "any state use the string 'unknown', enforcing a direct reply.");
+ "request will be queued on target until it changes. To not give "
+ "any state use the string 'unknown', enforcing a direct reply.");
rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the state requester");
rb.ReturnDesc("nodestate", "State string for this node");
rb.ReturnDesc("hostinfo", "Information about host this node is running on");
//-------------------------------------------------------------------------
- rb.DefineMethod("getnodestate2", "si", "s", FRT_METHOD(FNetListener::RPC_getNodeState2), this);
+ rb.DefineMethod("getnodestate2", "si", "s", FRT_METHOD(ClusterControllerApiRpcService::RPC_getNodeState2), this);
rb.MethodDesc("Get state of this node");
rb.ParamDesc("nodestate", "Expected state of given node. If correct, the "
- "request will be queued on target until it changes. To not give "
- "any state use the string 'unknown', enforcing a direct reply.");
+ "request will be queued on target until it changes. To not give "
+ "any state use the string 'unknown', enforcing a direct reply.");
rb.ParamDesc("timeout", "Timeout of message in milliseconds, set by the state requester");
rb.ReturnDesc("nodestate", "State string for this node");
//-------------------------------------------------------------------------
- rb.DefineMethod("setsystemstate2", "s", "", FRT_METHOD(FNetListener::RPC_setSystemState2), this);
+ rb.DefineMethod("setsystemstate2", "s", "", FRT_METHOD(ClusterControllerApiRpcService::RPC_setSystemState2), this);
rb.MethodDesc("Set systemstate on this node");
rb.ParamDesc("systemstate", "New systemstate to set");
//-------------------------------------------------------------------------
- rb.DefineMethod("setdistributionstates", "bix", "", FRT_METHOD(FNetListener::RPC_setDistributionStates), this);
+ rb.DefineMethod("setdistributionstates", "bix", "", FRT_METHOD(ClusterControllerApiRpcService::RPC_setDistributionStates), this);
rb.MethodDesc("Set distribution states for cluster and bucket spaces");
rb.ParamDesc("compressionType", "Compression type for payload");
rb.ParamDesc("uncompressedSize", "Uncompressed size for payload");
rb.ParamDesc("payload", "Binary Slime format payload");
//-------------------------------------------------------------------------
- rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this);
+ rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(ClusterControllerApiRpcService::RPC_activateClusterStateVersion), this);
rb.MethodDesc("Explicitly activates an already prepared cluster state version");
rb.ParamDesc("activate_version", "Expected cluster state version to activate");
rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC");
//-------------------------------------------------------------------------
- rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this);
+ rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(ClusterControllerApiRpcService::RPC_getCurrentTime), this);
rb.MethodDesc("Get current time on this node");
rb.ReturnDesc("seconds", "Current time in seconds since epoch");
rb.ReturnDesc("nanoseconds", "additional nanoseconds since epoch");
rb.ReturnDesc("hostname", "Host name");
- //-------------------------------------------------------------------------
}
-
-void
-FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req)
-{
+// TODO remove? is this used by anyone?
+void ClusterControllerApiRpcService::RPC_getCurrentTime(FRT_RPCRequest* req) {
if (_closed) {
LOG(debug, "Not handling RPC call getCurrentTime() as we have closed");
req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
@@ -127,19 +89,19 @@ FNetListener::RPC_getCurrentTime(FRT_RPCRequest *req)
vespalib::string hostname = vespalib::HostName::get();
req->GetReturn()->AddString(hostname.c_str());
// all handled, will return immediately
- return;
}
-void FNetListener::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req) {
+void ClusterControllerApiRpcService::detach_and_forward_to_enqueuer(
+ std::shared_ptr<api::StorageMessage> cmd,
+ FRT_RPCRequest* req)
+{
// Create a request object to avoid needing a separate transport type
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
req->Detach();
- _messageEnqueuer.enqueue(std::move(cmd));
+ _message_enqueuer.enqueue(std::move(cmd));
}
-void
-FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
-{
+void ClusterControllerApiRpcService::RPC_getNodeState2(FRT_RPCRequest* req) {
if (_closed) {
LOG(debug, "Not handling RPC call getNodeState2() as we have closed");
req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
@@ -147,11 +109,11 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
}
vespalib::string expected(req->GetParams()->GetValue(0)._string._str,
- req->GetParams()->GetValue(0)._string._len);
+ req->GetParams()->GetValue(0)._string._len);
- auto cmd(std::make_shared<api::GetNodeStateCommand>(expected != "unknown"
- ? std::make_unique<lib::NodeState>(expected)
- : std::unique_ptr<lib::NodeState>()));
+ auto cmd = std::make_shared<api::GetNodeStateCommand>(expected != "unknown"
+ ? std::make_unique<lib::NodeState>(expected)
+ : std::unique_ptr<lib::NodeState>());
cmd->setPriority(api::StorageMessage::VERYHIGH);
cmd->setTimeout(std::chrono::milliseconds(req->GetParams()->GetValue(1)._intval32));
@@ -161,9 +123,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
detach_and_forward_to_enqueuer(std::move(cmd), req);
}
-void
-FNetListener::RPC_setSystemState2(FRT_RPCRequest *req)
-{
+void ClusterControllerApiRpcService::RPC_setSystemState2(FRT_RPCRequest* req) {
if (_closed) {
LOG(debug, "Not handling RPC call setSystemState2() as we have closed");
req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
@@ -173,7 +133,7 @@ FNetListener::RPC_setSystemState2(FRT_RPCRequest *req)
req->GetParams()->GetValue(0)._string._len);
lib::ClusterState systemState(systemStateStr);
- auto cmd(std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)));
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState));
cmd->setPriority(api::StorageMessage::VERYHIGH);
detach_and_forward_to_enqueuer(std::move(cmd), req);
@@ -183,10 +143,10 @@ namespace {
std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const FRT_Values& params) {
const uint32_t uncompressed_length = params[1]._intval32;
- if (uncompressed_length > FNetListener::StateBundleMaxUncompressedSize) {
+ if (uncompressed_length > ClusterControllerApiRpcService::StateBundleMaxUncompressedSize) {
throw std::range_error(vespalib::make_string("RPC ClusterStateBundle uncompressed size (%u) is "
"greater than max size (%u)", uncompressed_length,
- FNetListener::StateBundleMaxUncompressedSize));
+ ClusterControllerApiRpcService::StateBundleMaxUncompressedSize));
}
SlimeClusterStateBundleCodec codec;
EncodedClusterStateBundle encoded_bundle;
@@ -200,7 +160,7 @@ std::shared_ptr<const lib::ClusterStateBundle> decode_bundle_from_params(const F
}
-void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
+void ClusterControllerApiRpcService::RPC_setDistributionStates(FRT_RPCRequest* req) {
if (_closed) {
LOG(debug, "Not handling RPC call setDistributionStates() as we have closed");
req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
@@ -223,7 +183,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
detach_and_forward_to_enqueuer(std::move(cmd), req);
}
-void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) {
+void ClusterControllerApiRpcService::RPC_activateClusterStateVersion(FRT_RPCRequest* req) {
if (_closed) {
LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed");
req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down");
diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h
new file mode 100644
index 00000000000..837f00fad5e
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h
@@ -0,0 +1,50 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/fnet/frt/invokable.h>
+#include <atomic>
+#include <memory>
+
+class FRT_RPCRequest;
+
+namespace storage {
+
+class MessageEnqueuer;
+
+namespace api {
+class StorageCommand;
+class StorageMessage;
+class StorageMessageAddress;
+class StorageReply;
+}
+
+namespace rpc {
+
+class SharedRpcResources;
+
+class ClusterControllerApiRpcService : public FRT_Invokable {
+ MessageEnqueuer& _message_enqueuer;
+ SharedRpcResources& _rpc_resources;
+ std::atomic<bool> _closed;
+public:
+ static constexpr uint32_t StateBundleMaxUncompressedSize = 1024 * 1024 * 16;
+
+ ClusterControllerApiRpcService(MessageEnqueuer& message_enqueuer,
+ SharedRpcResources& rpc_resources);
+ ~ClusterControllerApiRpcService() override;
+
+ void close();
+
+ void RPC_getNodeState2(FRT_RPCRequest* req);
+ void RPC_setSystemState2(FRT_RPCRequest* req);
+ void RPC_getCurrentTime(FRT_RPCRequest* req);
+ void RPC_setDistributionStates(FRT_RPCRequest* req);
+ void RPC_activateClusterStateVersion(FRT_RPCRequest* req);
+private:
+ void register_server_methods(SharedRpcResources&);
+ // TODO factor out as shared functionality
+ void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
+};
+
+} // rpc
+} // storage
diff --git a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h
index 41c5db9876d..ea6ef2649d0 100644
--- a/storage/src/vespa/storage/storageserver/cluster_state_bundle_codec.h
+++ b/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h
@@ -4,9 +4,9 @@
#include "encoded_cluster_state_bundle.h"
-namespace storage {
+namespace storage::lib { class ClusterStateBundle; }
-namespace lib { class ClusterStateBundle; }
+namespace storage::rpc {
/**
* Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC.
diff --git a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h
index 6f25a6b67a6..92e66aab378 100644
--- a/storage/src/vespa/storage/storageserver/encoded_cluster_state_bundle.h
+++ b/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h
@@ -5,7 +5,7 @@
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/util/compressor.h>
-namespace storage {
+namespace storage::rpc {
/**
* Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle.
diff --git a/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto b/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto
new file mode 100644
index 00000000000..2eab6068bad
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto
@@ -0,0 +1,15 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+option cc_enable_arenas = true;
+
+package storage.protobuf;
+
+message RequestHeader {
+ uint64 time_remaining_ms = 1;
+ uint32 trace_level = 2;
+}
+
+message ResponseHeader {
+ bytes trace_payload = 1;
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
new file mode 100644
index 00000000000..ddf881d3bd3
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
@@ -0,0 +1,17 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "rpc_target.h"
+#include <vespa/fnet/frt/target.h>
+
+namespace storage::rpc {
+
+RpcTarget::RpcTarget(FRT_Target* target, vespalib::stringref spec, uint32_t sb_generation)
+ : _target(target),
+ _spec(spec),
+ _sb_generation(sb_generation)
+{}
+
+RpcTarget::~RpcTarget() {
+ _target->SubRef();
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
new file mode 100644
index 00000000000..7fd1ddeafaf
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <cstdint>
+
+class FRT_Target;
+
+namespace storage::rpc {
+
+struct RpcTarget {
+ FRT_Target* _target;
+ const vespalib::string _spec;
+ uint32_t _sb_generation;
+
+ // Target must have ref count of at least 1
+ RpcTarget(FRT_Target* target,
+ vespalib::stringref spec,
+ uint32_t sb_generation);
+ RpcTarget(const RpcTarget&) = delete;
+ RpcTarget& operator=(const RpcTarget&) = delete;
+ RpcTarget(RpcTarget&&) = delete;
+ RpcTarget& operator=(RpcTarget&&) = delete;
+ ~RpcTarget();
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
new file mode 100644
index 00000000000..4ce1732c6f8
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -0,0 +1,74 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "shared_rpc_resources.h"
+#include <vespa/fastos/thread.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/slobrok/sbregister.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <cassert>
+#include <chrono>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.shared_rpc_resources");
+
+using namespace std::chrono_literals;
+
+namespace storage::rpc {
+
+SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
+ int rpc_server_port,
+ size_t rpc_thread_pool_size)
+ : _thread_pool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>(rpc_thread_pool_size)),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
+ _slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, config_uri)),
+ _slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config_uri)),
+ _rpc_server_port(rpc_server_port),
+ _shutdown(false)
+{
+}
+
+// TODO make sure init/shutdown is safe for aborted init in comm. mgr.
+
+SharedRpcResources::~SharedRpcResources() {
+ if (!_shutdown) {
+ shutdown();
+ }
+}
+
+void SharedRpcResources::start_server_and_register_slobrok(vespalib::stringref my_handle) {
+ LOG(debug, "Starting main RPC supervisor on port %d", _rpc_server_port);
+ if (!_orb->Listen(_rpc_server_port)) {
+ throw vespalib::IllegalStateException(vespalib::make_string("Failed to listen to RPC port %d", _rpc_server_port),
+ VESPA_STRLOC);
+ }
+ _transport->Start(_thread_pool.get());
+ _slobrok_register->registerName(my_handle);
+ wait_until_slobrok_is_ready();
+ _handle = my_handle;
+}
+
+void SharedRpcResources::wait_until_slobrok_is_ready() {
+ // TODO look more closely at how mbus does its slobrok business
+ while (_slobrok_register->busy() || !_slobrok_mirror->ready()) {
+ // TODO some form of timeout mechanism here, and warning logging to identify SB issues
+ LOG(debug, "Waiting for Slobrok to become ready");
+ std::this_thread::sleep_for(50ms);
+ }
+}
+
+void SharedRpcResources::shutdown() {
+ assert(!_shutdown);
+ _slobrok_register->unregisterName(_handle);
+ _transport->ShutDown(true);
+ _shutdown = true;
+}
+
+int SharedRpcResources::listen_port() const noexcept {
+ return _orb->GetListenPort();
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
new file mode 100644
index 00000000000..9b2f1c04249
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -0,0 +1,49 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/config/subscription/configuri.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <memory>
+
+class FastOS_ThreadPool;
+class FNET_Transport;
+class FRT_Supervisor;
+
+namespace slobrok::api {
+class RegisterAPI;
+class MirrorAPI;
+}
+
+namespace storage::rpc {
+
+class SharedRpcResources {
+ std::unique_ptr<FastOS_ThreadPool> _thread_pool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register;
+ std::unique_ptr<slobrok::api::MirrorAPI> _slobrok_mirror;
+ vespalib::string _handle;
+ int _rpc_server_port;
+ bool _shutdown;
+public:
+ SharedRpcResources(const config::ConfigUri& config_uri, int rpc_server_port, size_t rpc_thread_pool_size);
+ ~SharedRpcResources();
+
+ FRT_Supervisor& supervisor() noexcept { return *_orb; }
+ const FRT_Supervisor& supervisor() const noexcept { return *_orb; }
+
+ slobrok::api::RegisterAPI& slobrok_register() noexcept { return *_slobrok_register; }
+ const slobrok::api::RegisterAPI& slobrok_register() const noexcept { return *_slobrok_register; }
+ slobrok::api::MirrorAPI& slobrok_mirror() noexcept { return *_slobrok_mirror; }
+ const slobrok::api::MirrorAPI& slobrok_mirror() const noexcept { return *_slobrok_mirror; }
+ // To be called after all RPC handlers have been registered.
+ void start_server_and_register_slobrok(vespalib::stringref my_handle);
+
+ void shutdown();
+ [[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started
+private:
+ void wait_until_slobrok_is_ready();
+};
+
+
+}
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp
index 1f854bc724e..0e8a3081aa2 100644
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp
@@ -18,7 +18,7 @@ using vespalib::compression::compress;
using vespalib::Memory;
using namespace vespalib::slime;
-namespace storage {
+namespace storage::rpc {
// TODO find a suitable home for this class to avoid dupes with rpcsendv2.cpp
namespace {
diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h
index 1fb95134059..0c25de5faa5 100644
--- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.h
+++ b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h
@@ -5,7 +5,7 @@
#include "cluster_state_bundle_codec.h"
#include <memory>
-namespace storage {
+namespace storage::rpc {
/**
* Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
new file mode 100644
index 00000000000..4c911f092a3
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -0,0 +1,302 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "storage_api_rpc_service.h"
+#include "caching_rpc_target_resolver.h"
+#include "shared_rpc_resources.h"
+#include "rpc_envelope.pb.h"
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <vespa/storage/storageserver/communicationmanager.h>
+#include <vespa/storage/storageserver/rpcrequestwrapper.h>
+#include <vespa/storageapi/mbusprot/protocolserialization7.h>
+#include <vespa/storageapi/messageapi/storagecommand.h>
+#include <vespa/vespalib/data/databuffer.h>
+#include <vespa/vespalib/util/compressor.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.storage_api_rpc_service");
+
+namespace storage::rpc {
+
+StorageApiRpcService::StorageApiRpcService(MessageEnqueuer& messageEnqueuer,
+ SharedRpcResources& rpc_resources,
+ std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func,
+ std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func)
+ : _message_enqueuer(messageEnqueuer),
+ _rpc_resources(rpc_resources),
+ // TODO these are temporary, need to be consolidated and moved out
+ _doctype_repo_func(std::move(doctype_repo_func)),
+ _loadtype_set_func(std::move(loadtype_set_func)),
+ _target_resolver(std::make_unique<CachingRpcTargetResolver>(rpc_resources))
+{
+ register_server_methods(rpc_resources);
+}
+
+StorageApiRpcService::~StorageApiRpcService() = default;
+
+void StorageApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) {
+ FRT_ReflectionBuilder rb(&rpc_resources.supervisor());
+ rb.DefineMethod("storageapi.v1.send", "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this);
+ rb.MethodDesc("V1 of StorageAPI direct RPC protocol");
+ rb.ParamDesc("header_encoding", "0=raw, 6=lz4");
+ rb.ParamDesc("header_decoded_size", "Uncompressed header blob size");
+ rb.ParamDesc("header_payload", "The message header blob");
+ rb.ParamDesc("body_encoding", "0=raw, 6=lz4");
+ rb.ParamDesc("body_decoded_size", "Uncompressed body blob size");
+ rb.ParamDesc("body_payload", "The message body blob");
+ rb.ReturnDesc("header_encoding", "0=raw, 6=lz4");
+ rb.ReturnDesc("header_decoded_size", "Uncompressed header blob size");
+ rb.ReturnDesc("header_payload", "The reply header blob");
+ rb.ReturnDesc("body_encoding", "0=raw, 6=lz4");
+ rb.ReturnDesc("body_decoded_size", "Uncompressed body blob size");
+ rb.ReturnDesc("body_payload", "The reply body blob");
+}
+
+void StorageApiRpcService::detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req) {
+ // Create a request object to avoid needing a separate transport type
+ cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::make_unique<RPCRequestWrapper>(req)));
+ req->Detach();
+ _message_enqueuer.enqueue(std::move(cmd));
+}
+
+namespace {
+
+struct SubRefDeleter {
+ template <typename T>
+ void operator()(T* v) const noexcept {
+ v->SubRef();
+ }
+};
+
+template <typename HeaderType>
+bool decode_header_from_rpc_params(const FRT_Values& params, HeaderType& hdr) {
+ const auto compression_type = vespalib::compression::CompressionConfig::toType(params[0]._intval8);
+ const uint32_t uncompressed_length = params[1]._intval32;
+
+ if (compression_type == vespalib::compression::CompressionConfig::NONE) {
+ // Fast-path in the common case where request header is not compressed.
+ return hdr.ParseFromArray(params[2]._data._buf, params[2]._data._len);
+ } else {
+ vespalib::DataBuffer uncompressed(params[2]._data._buf, params[2]._data._len);
+ vespalib::ConstBufferRef blob(params[2]._data._buf, params[2]._data._len);
+ decompress(compression_type, uncompressed_length, blob, uncompressed, true);
+ assert(uncompressed_length == uncompressed.getDataLen());
+ return hdr.ParseFromArray(uncompressed.getData(), uncompressed.getDataLen());
+ }
+}
+
+// Must be done prior to adding payload
+template <typename HeaderType>
+void encode_header_into_rpc_params(HeaderType& hdr, FRT_Values& params) {
+ params.AddInt8(vespalib::compression::CompressionConfig::Type::NONE); // TODO when needed
+ const auto header_size = hdr.ByteSizeLong();
+ assert(header_size <= UINT32_MAX);
+ params.AddInt32(static_cast<uint32_t>(header_size));
+ auto* header_buf = reinterpret_cast<uint8_t*>(params.AddData(header_size));
+ hdr.SerializeWithCachedSizesToArray(header_buf);
+}
+
+void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, FRT_Values& params) {
+ assert(payload.size() <= UINT32_MAX);
+ vespalib::ConstBufferRef to_compress(payload.data(), payload.size());
+ vespalib::DataBuffer buf(vespalib::roundUp2inN(payload.size()));
+ // TODO configurable compression config?
+ vespalib::compression::CompressionConfig comp_cfg(vespalib::compression::CompressionConfig::Type::LZ4);
+ auto comp_type = compress(comp_cfg, to_compress, buf, false);
+ assert(buf.getDataLen() <= UINT32_MAX);
+
+ params.AddInt8(comp_type);
+ params.AddInt32(static_cast<uint32_t>(to_compress.size()));
+ params.AddData(buf.stealBuffer(), buf.getDataLen());
+}
+
+} // anon ns
+
+template <typename MessageType>
+void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params) {
+ // Shared ptrs must stay alive
+ // TODO move out, cache
+ auto repo = _doctype_repo_func();
+ auto load_types = _loadtype_set_func();
+ mbusprot::ProtocolSerialization7 codec(repo, *load_types);
+ auto payload = codec.encode(msg);
+
+ compress_and_add_payload_to_rpc_params(payload, params);
+}
+
+template <typename PayloadCodecCallback>
+void StorageApiRpcService::uncompress_rpc_payload(
+ const FRT_Values& params,
+ PayloadCodecCallback payload_callback)
+{
+ const auto compression_type = vespalib::compression::CompressionConfig::toType(params[3]._intval8);
+ const uint32_t uncompressed_length = params[4]._intval32;
+ // TODO fast path if uncompressed?
+ vespalib::DataBuffer uncompressed(params[5]._data._buf, params[5]._data._len);
+ vespalib::ConstBufferRef blob(params[5]._data._buf, params[5]._data._len);
+ decompress(compression_type, uncompressed_length, blob, uncompressed, true);
+ assert(uncompressed_length == uncompressed.getDataLen());
+ assert(uncompressed_length <= UINT32_MAX);
+
+ // Shared ptrs must stay alive
+ // TODO move out, cache
+ auto repo = _doctype_repo_func();
+ auto loadtypes = _loadtype_set_func();
+ mbusprot::ProtocolSerialization7 codec(repo, *loadtypes);
+
+ payload_callback(codec, mbus::BlobRef(uncompressed.getData(), uncompressed_length));
+}
+
+void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
+ LOG(info, "Server: received rpc.v1 request");
+ // TODO do we need to manually check the parameter/return spec here?
+ const auto& params = *req->GetParams();
+ protobuf::RequestHeader hdr;
+ if (!decode_header_from_rpc_params(params, hdr)) {
+ req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request header protobuf");
+ return;
+ }
+ std::unique_ptr<mbusprot::StorageCommand> cmd;
+ uint32_t uncompressed_size = 0;
+ uncompress_rpc_payload(params, [&cmd, &uncompressed_size](auto& codec, auto payload) {
+ cmd = codec.decodeCommand(payload);
+ uncompressed_size = static_cast<uint32_t>(payload.size());
+ });
+ if (cmd && cmd->has_command()) {
+ auto scmd = cmd->steal_command();
+ scmd->setApproxByteSize(uncompressed_size);
+ scmd->getTrace().setLevel(hdr.trace_level());
+ scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms()));
+ req->DiscardBlobs();
+ detach_and_forward_to_enqueuer(std::move(scmd), req);
+ } else {
+ req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request payload");
+ }
+}
+
+void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) {
+ LOG(info, "Server: encoding rpc.v1 response header and payload");
+ auto* ret = request.GetReturn();
+
+ // TODO skip encoding header altogether if no relevant fields set?
+ protobuf::ResponseHeader hdr;
+ if (reply.getTrace().getLevel() > 0) {
+ hdr.set_trace_payload(reply.getTrace().getRoot().encode());
+ }
+ // TODO consistent naming...
+ encode_header_into_rpc_params(hdr, *ret);
+ encode_and_compress_rpc_payload<api::StorageReply>(reply, *ret);
+}
+
+void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd) {
+ LOG(info, "Client: sending rpc.v1 request for message of type %s", cmd->getType().getName().c_str());
+
+ assert(cmd->getAddress() != nullptr);
+ auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress());
+ if (!target) {
+ // TODO need to bounce reply with TODO descriptive error code, mirroring that of RPCNetwork!
+ auto reply = cmd->makeReply();
+ reply->setResult(api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE),
+ "Couldn't find the darn thing in Slobrok")); // TODO :D
+ // TODO always enforce separate thread for this, or we risk nuking the
+ // stack if the reply receiver keeps resending immediately and synchronously.
+ _message_enqueuer.enqueue(std::move(reply)); // TODO dispatch_async_in_separate_thread(std::move(reply))?
+ return;
+ }
+ std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
+ req->SetMethodName("storageapi.v1.send");
+
+ protobuf::RequestHeader req_hdr;
+ req_hdr.set_time_remaining_ms(std::chrono::duration_cast<std::chrono::milliseconds>(cmd->getTimeout()).count());
+ req_hdr.set_trace_level(cmd->getTrace().getLevel());
+
+ auto* params = req->GetParams();
+ encode_header_into_rpc_params(req_hdr, *params);
+ encode_and_compress_rpc_payload<api::StorageCommand>(*cmd, *params);
+
+ const auto timeout = cmd->getTimeout();
+ // TODO verify it's fine that we alloc this on the request stash and use it this way
+ auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd), timeout);
+ req->SetContext(FNET_Context(&req_ctx));
+
+ target->_target->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
+}
+
+namespace {
+
+api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, vespalib::duration timeout) {
+ // TODO determine all codes that must be (re)mapped. Current remapping adapted from RPCSend
+ // TODO need to keep enough state to give peer ID/spec in error messages
+ switch (req.GetErrorCode()) {
+ case FRTE_RPC_TIMEOUT:
+ return api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::TIMEOUT),
+ vespalib::make_string("A timeout occurred while waiting for '%s' (%g seconds expired); %s",
+ "TODO", vespalib::to_s(timeout), req.GetErrorMessage()));
+ case FRTE_RPC_CONNECTION:
+ return api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::CONNECTION_ERROR),
+ vespalib::make_string("A connection error occurred for '%s'; %s",
+ "TODO", req.GetErrorMessage()));
+ default:
+ return api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NETWORK_ERROR),
+ vespalib::make_string("A network error occurred for '%s'; %s",
+ "TODO", req.GetErrorMessage()));
+ }
+}
+
+}
+
+void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
+ std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
+ auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
+ if (!req->CheckReturnTypes("bixbix")) {
+ api::ReturnCode error = map_frt_error_to_storage_api_error(*req, req_ctx->_timeout);
+ LOG(info, "Client: received rpc.v1 error response: %s", error.toString().c_str());
+ auto error_reply = req_ctx->_originator_cmd->makeReply();
+ error_reply->setResult(std::move(error));
+ // TODO needs tracing of received-event!
+ _message_enqueuer.enqueue(std::move(error_reply));
+ return;
+ }
+ LOG(info, "Client: received rpc.v1 OK response");
+
+ const auto& ret = *req->GetReturn();
+ protobuf::ResponseHeader hdr;
+ if (!decode_header_from_rpc_params(ret, hdr)) {
+ assert(false); // TODO generate error reply
+ return;
+ }
+ std::unique_ptr<mbusprot::StorageReply> wrapped_reply;
+ uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
+ wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd);
+ });
+ // TODO the reply wrapper does lazy deserialization. Can we/should we ever defer?
+ auto reply = wrapped_reply->getInternalMessage(); // TODO message stealing
+ assert(reply);
+
+ if (!hdr.trace_payload().empty()) {
+ req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ }
+ reply->getTrace().swap(req_ctx->_originator_cmd->getTrace());
+
+ // TODO ensure that no implicit long-lived refs end up pointing into RPC memory...!
+ req->DiscardBlobs();
+ _message_enqueuer.enqueue(std::move(reply));
+}
+
+/*
+ * Major TODOs:
+ * - tracing and trace propagation
+ * - forwards/backwards compatibility
+ * - lifetime semantics of FRT targets vs requests created from them?
+ * - lifetime of document type/fieldset repos vs messages
+ * - is repo ref squirreled away into the messages anywhere?
+ * - everything else! :3
+ */
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
new file mode 100644
index 00000000000..414c1d4e93c
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -0,0 +1,75 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "rpc_target.h"
+#include <vespa/fnet/frt/invokable.h>
+#include <vespa/fnet/frt/invoker.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <atomic>
+#include <functional>
+#include <memory>
+
+class FRT_RPCRequest;
+class FRT_Target;
+
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class LoadTypeSet; }
+
+namespace storage {
+
+class MessageEnqueuer;
+
+namespace api {
+class StorageCommand;
+class StorageMessage;
+class StorageMessageAddress;
+class StorageReply;
+}
+
+namespace rpc {
+
+class SharedRpcResources;
+class CachingRpcTargetResolver;
+
+class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait {
+ MessageEnqueuer& _message_enqueuer;
+ SharedRpcResources& _rpc_resources;
+public:
+ StorageApiRpcService(MessageEnqueuer& messageEnqueuer,
+ SharedRpcResources& rpc_resources,
+ // TODO temporary!
+ std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func,
+ std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func);
+ ~StorageApiRpcService() override;
+
+ void RPC_rpc_v1_send(FRT_RPCRequest* req);
+ void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
+ void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
+private:
+ // TODO dedupe
+ void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
+
+ struct RpcRequestContext {
+ std::shared_ptr<api::StorageCommand> _originator_cmd;
+ std::chrono::nanoseconds _timeout;
+
+ RpcRequestContext(std::shared_ptr<api::StorageCommand> cmd, std::chrono::nanoseconds timeout)
+ : _originator_cmd(std::move(cmd)),
+ _timeout(timeout)
+ {}
+ };
+
+ std::function<std::shared_ptr<const document::DocumentTypeRepo>()> _doctype_repo_func;
+ std::function<std::shared_ptr<documentapi::LoadTypeSet>()> _loadtype_set_func;
+ std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
+
+ void register_server_methods(SharedRpcResources&);
+ template <typename PayloadCodecCallback>
+ void uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback);
+ template <typename MessageType>
+ void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params);
+ void RequestDone(FRT_RPCRequest* request) override;
+};
+
+} // rpc
+} // storage
diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp
index 5f3ff86fe8c..2a083a2d704 100644
--- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp
+++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp
@@ -7,13 +7,13 @@
namespace storage {
RPCRequestWrapper::RPCRequestWrapper(FRT_RPCRequest *req)
- : _req(req)
+ : _req(req)
{
}
RPCRequestWrapper::~RPCRequestWrapper()
{
- if (_req != 0) {
+ if (_req) {
_req->SetError(ERR_REQUEST_DELETED, "Request deleted without having been replied to");
_req->Return();
}
@@ -22,7 +22,7 @@ RPCRequestWrapper::~RPCRequestWrapper()
const char *
RPCRequestWrapper::getParam() const
{
- assert(_req != 0);
+ assert(_req);
return _req->GetParams()->GetValue(0)._data._buf;
}
@@ -30,7 +30,7 @@ RPCRequestWrapper::getParam() const
uint32_t
RPCRequestWrapper::getParamLen() const
{
- assert(_req != 0);
+ assert(_req);
return _req->GetParams()->GetValue(0)._data._len;
}
@@ -38,26 +38,26 @@ RPCRequestWrapper::getParamLen() const
void
RPCRequestWrapper::returnData(const char *pt, uint32_t len)
{
- assert(_req != 0);
+ assert(_req);
_req->GetReturn()->AddData(pt, len);
_req->Return();
- _req = 0;
+ _req = nullptr;
}
void
RPCRequestWrapper::returnError(uint32_t errorCode, const char *errorMessage)
{
- assert(_req != 0);
+ assert(_req);
_req->SetError(errorCode, errorMessage);
_req->Return();
- _req = 0;
+ _req = nullptr;
}
void
RPCRequestWrapper::addReturnString(const char *str, uint32_t len)
{
- assert(_req != 0);
+ assert(_req);
if (len !=0) {
_req->GetReturn()->AddString(str, len);
} else {
@@ -68,16 +68,16 @@ RPCRequestWrapper::addReturnString(const char *str, uint32_t len)
void
RPCRequestWrapper::addReturnInt(uint32_t value)
{
- assert(_req != 0);
+ assert(_req);
_req->GetReturn()->AddInt32(value);
}
void
RPCRequestWrapper::returnRequest()
{
- assert(_req != 0);
+ assert(_req);
_req->Return();
- _req = 0;
+ _req = nullptr;
}
@@ -89,7 +89,7 @@ RPCRequestWrapper::getMethodName() const {
void
RPCRequestWrapper::discardBlobs()
{
- if (_req != 0) {
+ if (_req) {
_req->DiscardBlobs();
}
}
diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
index 8412e911601..e07163613ea 100644
--- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
+++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h
@@ -50,6 +50,8 @@ public:
**/
void returnError(uint32_t errorCode, const char *errorMessage);
+ FRT_RPCRequest* raw_request() noexcept { return _req; }
+
const char *getMethodName() const;
void addReturnString(const char *str, uint32_t len=0);
void addReturnInt(uint32_t value);
@@ -66,7 +68,7 @@ private:
RPCRequestWrapper(const RPCRequestWrapper &);
RPCRequestWrapper &operator=(const RPCRequestWrapper &);
- FRT_RPCRequest *_req; // underlying RPC request
+ FRT_RPCRequest* _req; // underlying RPC request
};
} // namespace storage