diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-09-28 12:24:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-28 12:24:17 +0200 |
commit | fdb0903eb467ddd32414a00e454aac05b53be857 (patch) | |
tree | 9cd7dc5a4dca943ae2bdcb6f9ceb6857091e9098 /storage | |
parent | 0da8512ff26b2aa8e3e0fc2a4e046b993910172f (diff) | |
parent | deefb5fac00e0cad03910f3ac926ac8c03842367 (diff) |
Merge pull request #14561 from vespa-engine/vekterli/auto-fallback-to-mbus-if-direct-rpc-not-supported
Automatically fallback to MessageBus if direct storage RPC not supported
Diffstat (limited to 'storage')
4 files changed, 109 insertions, 18 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 69259ee08ec..ee70f265297 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -96,18 +96,18 @@ vespalib::string to_slobrok_id(const api::StorageMessageAddress& address) { return CachingRpcTargetResolver::address_to_slobrok_id(address); } -class StorageApiNode { +class RpcNode { +protected: vdstestlib::DirConfig _config; std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo; std::shared_ptr<const documentapi::LoadTypeSet> _load_type_set; LockingMockOperationDispatcher _messages; std::unique_ptr<MessageCodecProvider> _codec_provider; std::unique_ptr<SharedRpcResources> _shared_rpc_resources; - std::unique_ptr<StorageApiRpcService> _service; api::StorageMessageAddress _node_address; vespalib::string _slobrok_id; public: - StorageApiNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok) + RpcNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok) : _config(getStandardConfig(true)), _doc_type_repo(document::TestDocRepo().getTypeRepoSp()), _load_type_set(std::make_shared<documentapi::LoadTypeSet>()), @@ -122,13 +122,10 @@ public: _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1); // TODO make codec provider into interface so we can test decode-failures more easily? _codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo, _load_type_set); - StorageApiRpcService::Params params; - _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider, params); - - _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id); - // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough. - wait_until_visible_in_slobrok(_slobrok_id); } + ~RpcNode(); + + const api::StorageMessageAddress& node_address() const noexcept { return _node_address; } void wait_until_visible_in_slobrok(vespalib::stringref id) { const auto deadline = std::chrono::steady_clock::now() + slobrok_register_timeout; @@ -139,8 +136,24 @@ public: std::this_thread::sleep_for(10ms); } } +}; - const api::StorageMessageAddress& node_address() const noexcept { return _node_address; } +RpcNode::~RpcNode() = default; + +class StorageApiNode : public RpcNode { + std::unique_ptr<StorageApiRpcService> _service; +public: + StorageApiNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok) + : RpcNode(node_index, is_distributor, slobrok) + { + StorageApiRpcService::Params params; + _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider, params); + + _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id); + // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough. + wait_until_visible_in_slobrok(_slobrok_id); + } + ~StorageApiNode(); std::shared_ptr<api::PutCommand> create_dummy_put_command() const { auto doc_type = _doc_type_repo->getDocumentType("testdoctype1"); @@ -177,6 +190,26 @@ public: _messages.wait_until_n_messages_received(1); return _messages.pop_first_message(); } + + bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept { + return _service->target_supports_direct_rpc(addr); + } +}; + +StorageApiNode::~StorageApiNode() { + // Ensure we shut down the underlying RPC threads before destroying + // the RPC service that may receive callbacks from it. + _shared_rpc_resources->shutdown(); +} + +struct NodeWithoutStorageApiService : RpcNode { + NodeWithoutStorageApiService(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok) + : RpcNode(node_index, is_distributor, slobrok) + { + _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id); + // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough. + wait_until_visible_in_slobrok(_slobrok_id); + } }; } // anonymous namespace @@ -304,4 +337,26 @@ TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_s EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things"))); } +TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_supported) { + NodeWithoutStorageApiService dummy_node(10, false, _slobrok); + _node_0->wait_until_visible_in_slobrok(to_slobrok_id(dummy_node.node_address())); + + // Initially we assume targets are on a new enough version to understand storage API RPCs. + EXPECT_TRUE(_node_0->target_supports_direct_rpc(dummy_node.node_address())); + EXPECT_TRUE(_node_0->target_supports_direct_rpc(_node_1->node_address())); + + // Send to an endpoint exposing RPC but not the Storage API server method. + // It will bounce back immediately with an FRT "no such method" error. + auto cmd = _node_0->create_dummy_put_command(); + cmd->setAddress(dummy_node.node_address()); + _node_0->send_request(cmd); + auto bounced_msg = _node_0->wait_and_receive_single_message(); + ASSERT_TRUE(bounced_msg); + + // For now (and for the sake of simplicity), fall back to assuming no targets + // support direct storage API RPC. + EXPECT_FALSE(_node_0->target_supports_direct_rpc(dummy_node.node_address())); + EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address())); +} + } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 99fdb97e435..667c577645a 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -575,7 +575,7 @@ CommunicationManager::sendCommand( case api::StorageMessageAddress::STORAGE: { LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); - if (_use_direct_storageapi_rpc) { + if (_use_direct_storageapi_rpc && _storage_api_rpc_service->target_supports_direct_rpc(address)) { _storage_api_rpc_service->send_rpc_v1_request(msg); } else { auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index c234f623d45..c02f0c56092 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -33,7 +33,8 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher _rpc_resources(rpc_resources), _message_codec_provider(message_codec_provider), _params(params), - _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())) + _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())), + _direct_rpc_supported(true) { register_server_methods(rpc_resources); } @@ -225,12 +226,7 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req); auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP); if (!req->CheckReturnTypes("bixbix")) { - api::ReturnCode error = map_frt_error_to_storage_api_error(*req, *req_ctx); - LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str()); - auto error_reply = req_ctx->_originator_cmd->makeReply(); - error_reply->setResult(std::move(error)); - // TODO needs tracing of received-event! - _message_dispatcher.dispatch_sync(std::move(error_reply)); + handle_request_done_rpc_error(*req, *req_ctx); return; } LOG(debug, "Client: received rpc.v1 OK response"); @@ -259,6 +255,22 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { _message_dispatcher.dispatch_sync(std::move(reply)); } +void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req, + const RpcRequestContext& req_ctx) { + auto error_reply = req_ctx._originator_cmd->makeReply(); + api::ReturnCode error; + if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) { + mark_peer_without_direct_rpc_support(*req_ctx._originator_cmd->getAddress()); + error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported"); + } else { + error = map_frt_error_to_storage_api_error(req, req_ctx); + } + LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str()); + error_reply->setResult(std::move(error)); + // TODO needs tracing of received-event! + _message_dispatcher.dispatch_sync(std::move(error_reply)); +} + api::ReturnCode StorageApiRpcService::map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx) { @@ -297,6 +309,23 @@ StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessag return api::ReturnCode(error_code, std::move(error_msg)); } +void StorageApiRpcService::mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr) { + bool expected = true; + if (_direct_rpc_supported.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + LOG(info, "Node %s does not support direct Storage API RPC; falling back " + "to legacy MessageBus protocol. Not logging this for any further nodes", + addr.toString().c_str()); + } +} + +bool StorageApiRpcService::target_supports_direct_rpc( + [[maybe_unused]] const api::StorageMessageAddress& addr) const noexcept { + // Stale reads isn't an issue here, since the worst case is just receiving + // a few more "no such method" errors. + return _direct_rpc_supported.load(std::memory_order_relaxed); +} + + /* * Major TODOs: * - tracing and trace propagation diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 3fca08acc15..c8152ebfdbd 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -47,6 +47,7 @@ private: MessageCodecProvider& _message_codec_provider; const Params _params; std::unique_ptr<CachingRpcTargetResolver> _target_resolver; + std::atomic<bool> _direct_rpc_supported; public: StorageApiRpcService(MessageDispatcher& message_dispatcher, SharedRpcResources& rpc_resources, @@ -54,6 +55,8 @@ public: const Params& params); ~StorageApiRpcService() override; + [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept; + void RPC_rpc_v1_send(FRT_RPCRequest* req); void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply); void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd); @@ -76,8 +79,12 @@ private: void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params); void RequestDone(FRT_RPCRequest* request) override; + void handle_request_done_rpc_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx); + api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx); api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const; + + void mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr); }; } // rpc |