summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-09-28 12:24:17 +0200
committerGitHub <noreply@github.com>2020-09-28 12:24:17 +0200
commitfdb0903eb467ddd32414a00e454aac05b53be857 (patch)
tree9cd7dc5a4dca943ae2bdcb6f9ceb6857091e9098 /storage
parent0da8512ff26b2aa8e3e0fc2a4e046b993910172f (diff)
parentdeefb5fac00e0cad03910f3ac926ac8c03842367 (diff)
Merge pull request #14561 from vespa-engine/vekterli/auto-fallback-to-mbus-if-direct-rpc-not-supported
Automatically fallback to MessageBus if direct storage RPC not supported
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp75
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp43
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h7
4 files changed, 109 insertions, 18 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 69259ee08ec..ee70f265297 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -96,18 +96,18 @@ vespalib::string to_slobrok_id(const api::StorageMessageAddress& address) {
return CachingRpcTargetResolver::address_to_slobrok_id(address);
}
-class StorageApiNode {
+class RpcNode {
+protected:
vdstestlib::DirConfig _config;
std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo;
std::shared_ptr<const documentapi::LoadTypeSet> _load_type_set;
LockingMockOperationDispatcher _messages;
std::unique_ptr<MessageCodecProvider> _codec_provider;
std::unique_ptr<SharedRpcResources> _shared_rpc_resources;
- std::unique_ptr<StorageApiRpcService> _service;
api::StorageMessageAddress _node_address;
vespalib::string _slobrok_id;
public:
- StorageApiNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
+ RpcNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
: _config(getStandardConfig(true)),
_doc_type_repo(document::TestDocRepo().getTypeRepoSp()),
_load_type_set(std::make_shared<documentapi::LoadTypeSet>()),
@@ -122,13 +122,10 @@ public:
_shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1);
// TODO make codec provider into interface so we can test decode-failures more easily?
_codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo, _load_type_set);
- StorageApiRpcService::Params params;
- _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider, params);
-
- _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
- // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
- wait_until_visible_in_slobrok(_slobrok_id);
}
+ ~RpcNode();
+
+ const api::StorageMessageAddress& node_address() const noexcept { return _node_address; }
void wait_until_visible_in_slobrok(vespalib::stringref id) {
const auto deadline = std::chrono::steady_clock::now() + slobrok_register_timeout;
@@ -139,8 +136,24 @@ public:
std::this_thread::sleep_for(10ms);
}
}
+};
- const api::StorageMessageAddress& node_address() const noexcept { return _node_address; }
+RpcNode::~RpcNode() = default;
+
+class StorageApiNode : public RpcNode {
+ std::unique_ptr<StorageApiRpcService> _service;
+public:
+ StorageApiNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
+ : RpcNode(node_index, is_distributor, slobrok)
+ {
+ StorageApiRpcService::Params params;
+ _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider, params);
+
+ _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
+ // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
+ wait_until_visible_in_slobrok(_slobrok_id);
+ }
+ ~StorageApiNode();
std::shared_ptr<api::PutCommand> create_dummy_put_command() const {
auto doc_type = _doc_type_repo->getDocumentType("testdoctype1");
@@ -177,6 +190,26 @@ public:
_messages.wait_until_n_messages_received(1);
return _messages.pop_first_message();
}
+
+ bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept {
+ return _service->target_supports_direct_rpc(addr);
+ }
+};
+
+StorageApiNode::~StorageApiNode() {
+ // Ensure we shut down the underlying RPC threads before destroying
+ // the RPC service that may receive callbacks from it.
+ _shared_rpc_resources->shutdown();
+}
+
+struct NodeWithoutStorageApiService : RpcNode {
+ NodeWithoutStorageApiService(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
+ : RpcNode(node_index, is_distributor, slobrok)
+ {
+ _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
+ // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
+ wait_until_visible_in_slobrok(_slobrok_id);
+ }
};
} // anonymous namespace
@@ -304,4 +337,26 @@ TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_s
EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things")));
}
+TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_supported) {
+ NodeWithoutStorageApiService dummy_node(10, false, _slobrok);
+ _node_0->wait_until_visible_in_slobrok(to_slobrok_id(dummy_node.node_address()));
+
+ // Initially we assume targets are on a new enough version to understand storage API RPCs.
+ EXPECT_TRUE(_node_0->target_supports_direct_rpc(dummy_node.node_address()));
+ EXPECT_TRUE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
+
+ // Send to an endpoint exposing RPC but not the Storage API server method.
+ // It will bounce back immediately with an FRT "no such method" error.
+ auto cmd = _node_0->create_dummy_put_command();
+ cmd->setAddress(dummy_node.node_address());
+ _node_0->send_request(cmd);
+ auto bounced_msg = _node_0->wait_and_receive_single_message();
+ ASSERT_TRUE(bounced_msg);
+
+ // For now (and for the sake of simplicity), fall back to assuming no targets
+ // support direct storage API RPC.
+ EXPECT_FALSE(_node_0->target_supports_direct_rpc(dummy_node.node_address()));
+ EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 99fdb97e435..667c577645a 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -575,7 +575,7 @@ CommunicationManager::sendCommand(
case api::StorageMessageAddress::STORAGE:
{
LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str());
- if (_use_direct_storageapi_rpc) {
+ if (_use_direct_storageapi_rpc && _storage_api_rpc_service->target_supports_direct_rpc(address)) {
_storage_api_rpc_service->send_rpc_v1_request(msg);
} else {
auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index c234f623d45..c02f0c56092 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -33,7 +33,8 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
_rpc_resources(rpc_resources),
_message_codec_provider(message_codec_provider),
_params(params),
- _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory()))
+ _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())),
+ _direct_rpc_supported(true)
{
register_server_methods(rpc_resources);
}
@@ -225,12 +226,7 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
if (!req->CheckReturnTypes("bixbix")) {
- api::ReturnCode error = map_frt_error_to_storage_api_error(*req, *req_ctx);
- LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str());
- auto error_reply = req_ctx->_originator_cmd->makeReply();
- error_reply->setResult(std::move(error));
- // TODO needs tracing of received-event!
- _message_dispatcher.dispatch_sync(std::move(error_reply));
+ handle_request_done_rpc_error(*req, *req_ctx);
return;
}
LOG(debug, "Client: received rpc.v1 OK response");
@@ -259,6 +255,22 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
_message_dispatcher.dispatch_sync(std::move(reply));
}
+void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req,
+ const RpcRequestContext& req_ctx) {
+ auto error_reply = req_ctx._originator_cmd->makeReply();
+ api::ReturnCode error;
+ if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) {
+ mark_peer_without_direct_rpc_support(*req_ctx._originator_cmd->getAddress());
+ error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported");
+ } else {
+ error = map_frt_error_to_storage_api_error(req, req_ctx);
+ }
+ LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str());
+ error_reply->setResult(std::move(error));
+ // TODO needs tracing of received-event!
+ _message_dispatcher.dispatch_sync(std::move(error_reply));
+}
+
api::ReturnCode
StorageApiRpcService::map_frt_error_to_storage_api_error(FRT_RPCRequest& req,
const RpcRequestContext& req_ctx) {
@@ -297,6 +309,23 @@ StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessag
return api::ReturnCode(error_code, std::move(error_msg));
}
+void StorageApiRpcService::mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr) {
+ bool expected = true;
+ if (_direct_rpc_supported.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+ LOG(info, "Node %s does not support direct Storage API RPC; falling back "
+ "to legacy MessageBus protocol. Not logging this for any further nodes",
+ addr.toString().c_str());
+ }
+}
+
+bool StorageApiRpcService::target_supports_direct_rpc(
+ [[maybe_unused]] const api::StorageMessageAddress& addr) const noexcept {
+ // Stale reads isn't an issue here, since the worst case is just receiving
+ // a few more "no such method" errors.
+ return _direct_rpc_supported.load(std::memory_order_relaxed);
+}
+
+
/*
* Major TODOs:
* - tracing and trace propagation
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 3fca08acc15..c8152ebfdbd 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -47,6 +47,7 @@ private:
MessageCodecProvider& _message_codec_provider;
const Params _params;
std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
+ std::atomic<bool> _direct_rpc_supported;
public:
StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
@@ -54,6 +55,8 @@ public:
const Params& params);
~StorageApiRpcService() override;
+ [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
+
void RPC_rpc_v1_send(FRT_RPCRequest* req);
void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
@@ -76,8 +79,12 @@ private:
void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params);
void RequestDone(FRT_RPCRequest* request) override;
+ void handle_request_done_rpc_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
+
api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const;
+
+ void mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr);
};
} // rpc