diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-30 23:00:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-30 23:00:02 +0200 |
commit | bbabb4d40ba374d0e34ac89d0352b8e564ef565d (patch) | |
tree | 999df147c83e25064600e635d2c53fba2f335fde /storage | |
parent | 4386de75fc03a9b568bd5e0632f336b3557833c2 (diff) | |
parent | 95a586154dacbfd9acdc604f8e63b282194743ea (diff) |
Merge pull request #14642 from vespa-engine/vekterli/gracefully-handle-rpc-decode-failures
Gracefully handle RPC header/payload decode failures
Diffstat (limited to 'storage')
4 files changed, 102 insertions, 19 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index ee70f265297..7fb33dc242b 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -4,6 +4,8 @@ #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/documentapi/loadtypes/loadtypeset.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> @@ -126,6 +128,8 @@ public: ~RpcNode(); const api::StorageMessageAddress& node_address() const noexcept { return _node_address; } + const SharedRpcResources& shared_rpc_resources() const noexcept { return *_shared_rpc_resources; } + SharedRpcResources& shared_rpc_resources() noexcept { return *_shared_rpc_resources; } void wait_until_visible_in_slobrok(vespalib::stringref id) { const auto deadline = std::chrono::steady_clock::now() + slobrok_register_timeout; @@ -194,6 +198,19 @@ public: bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept { return _service->target_supports_direct_rpc(addr); } + + void send_raw_request_and_expect_error(StorageApiNode& node, + FRT_RPCRequest* req, + const vespalib::string& expected_msg) { + auto spec = vespalib::make_string("tcp/localhost:%d", node.shared_rpc_resources().listen_port()); + auto* target = _shared_rpc_resources->supervisor().GetTarget(spec.c_str()); + target->InvokeSync(req, 60.0); + EXPECT_TRUE(req->IsError()); + EXPECT_EQ(req->GetErrorCode(), FRTE_RPC_METHOD_FAILED); + EXPECT_EQ(req->GetErrorMessage(), expected_msg); + target->SubRef(); + req->SubRef(); + } }; StorageApiNode::~StorageApiNode() { @@ -359,4 +376,36 @@ TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_support EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address())); } +TEST_F(StorageApiRpcServiceTest, malformed_request_header_returns_rpc_error) { + auto& supervisor = _node_0->shared_rpc_resources().supervisor(); + auto* req = supervisor.AllocRPCRequest(); + req->SetMethodName(StorageApiRpcService::rpc_v1_method_name()); + auto* params = req->GetParams(); + params->AddInt8(0); // No compression + params->AddInt32(24); + strncpy(params->AddData(24), "some non protobuf stuff", 24); + params->AddInt8(0); // Still no compression + params->AddInt32(0); // Not actually valid, but we'll try to decode the header first. + params->AddData(0); + + _node_0->send_raw_request_and_expect_error(*_node_1, req, "Unable to decode RPC request header protobuf"); +} + +TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) { + auto& supervisor = _node_0->shared_rpc_resources().supervisor(); + auto* req = supervisor.AllocRPCRequest(); + req->SetMethodName(StorageApiRpcService::rpc_v1_method_name()); + auto* params = req->GetParams(); + params->AddInt8(0); // No compression + params->AddInt32(0); + params->AddData(0); // This is a valid empty protobuf header with no fields set + params->AddInt8(0); // Even still no compression + params->AddInt32(0); // This, however, isn't valid, since at least sizeof(uint32_t) must be present + params->AddData(0); + + _node_0->send_raw_request_and_expect_error(*_node_1, req, "Unable to decode RPC request payload"); +} + +// TODO also test bad response header/payload + } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 667c577645a..86c987a15c5 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -638,7 +638,7 @@ CommunicationManager::sendDirectRPCReply( { std::string_view requestName(request.getMethodName()); // TODO non-name based dispatch // TODO rework this entire dispatch mechanism :D - if (requestName == "storageapi.v1.send") { + if (requestName == rpc::StorageApiRpcService::rpc_v1_method_name()) { _storage_api_rpc_service->encode_rpc_v1_response(*request.raw_request(), *reply); } else if (requestName == "getnodestate3") { auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply)); diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index c02f0c56092..a8cdc0bfeea 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -47,7 +47,7 @@ StorageApiRpcService::Params::~Params() = default; void StorageApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) { FRT_ReflectionBuilder rb(&rpc_resources.supervisor()); - rb.DefineMethod("storageapi.v1.send", "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this); + rb.DefineMethod(rpc_v1_method_name(), "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this); rb.MethodDesc("V1 of StorageAPI direct RPC protocol"); rb.ParamDesc("header_encoding", "0=raw, 6=lz4"); rb.ParamDesc("header_decoded_size", "Uncompressed header blob size"); @@ -132,7 +132,7 @@ void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& ms } template <typename PayloadCodecCallback> -void StorageApiRpcService::uncompress_rpc_payload( +bool StorageApiRpcService::uncompress_rpc_payload( const FRT_Values& params, PayloadCodecCallback payload_callback) { @@ -146,25 +146,32 @@ void StorageApiRpcService::uncompress_rpc_payload( assert(uncompressed_length <= UINT32_MAX); auto wrapped_codec = _message_codec_provider.wrapped_codec(); - payload_callback(wrapped_codec->codec(), mbus::BlobRef(uncompressed.getData(), uncompressed_length)); + try { + payload_callback(wrapped_codec->codec(), mbus::BlobRef(uncompressed.getData(), uncompressed_length)); + } catch (std::exception& e) { + LOG(debug, "Caught exception during decode callback: '%s'", e.what()); + return false; + } + return true; } void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { - LOG(debug, "Server: received rpc.v1 request"); + LOG(spam, "Server: received rpc.v1 request"); // TODO do we need to manually check the parameter/return spec here? const auto& params = *req->GetParams(); protobuf::RequestHeader hdr; if (!decode_header_from_rpc_params(params, hdr)) { - req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request header protobuf"); + req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request header protobuf"); return; } std::unique_ptr<mbusprot::StorageCommand> cmd; uint32_t uncompressed_size = 0; - uncompress_rpc_payload(params, [&cmd, &uncompressed_size](auto& codec, auto payload) { + bool ok = uncompress_rpc_payload(params, [&cmd, &uncompressed_size](auto& codec, auto payload) { cmd = codec.decodeCommand(payload); uncompressed_size = static_cast<uint32_t>(payload.size()); }); - if (cmd && cmd->has_command()) { + if (ok) { + assert(cmd && cmd->has_command()); auto scmd = cmd->steal_command(); scmd->setApproxByteSize(uncompressed_size); scmd->getTrace().setLevel(hdr.trace_level()); @@ -172,12 +179,12 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { req->DiscardBlobs(); detach_and_forward_to_enqueuer(std::move(scmd), req); } else { - req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request payload"); + req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload"); } } void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) { - LOG(debug, "Server: encoding rpc.v1 response header and payload"); + LOG(spam, "Server: encoding rpc.v1 response header and payload"); auto* ret = request.GetReturn(); // TODO skip encoding header altogether if no relevant fields set? @@ -191,7 +198,8 @@ void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const } void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd) { - LOG(debug, "Client: sending rpc.v1 request for message of type %s", cmd->getType().getName().c_str()); + LOG(spam, "Client: sending rpc.v1 request for message of type %s to %s", + cmd->getType().getName().c_str(), cmd->getAddress()->toString().c_str()); assert(cmd->getAddress() != nullptr); auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress()); @@ -204,7 +212,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma return; } std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest()); - req->SetMethodName("storageapi.v1.send"); + req->SetMethodName(rpc_v1_method_name()); protobuf::RequestHeader req_hdr; req_hdr.set_time_remaining_ms(std::chrono::duration_cast<std::chrono::milliseconds>(cmd->getTimeout()).count()); @@ -229,18 +237,23 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { handle_request_done_rpc_error(*req, *req_ctx); return; } - LOG(debug, "Client: received rpc.v1 OK response"); + LOG(spam, "Client: received rpc.v1 OK response"); const auto& ret = *req->GetReturn(); protobuf::ResponseHeader hdr; if (!decode_header_from_rpc_params(ret, hdr)) { - assert(false); // TODO generate error reply + handle_request_done_decode_error(*req_ctx, "Failed to decode RPC response header protobuf"); return; } std::unique_ptr<mbusprot::StorageReply> wrapped_reply; - uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) { + bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) { wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd); }); + if (!ok) { + assert(!wrapped_reply); + handle_request_done_decode_error(*req_ctx, "Failed to decode RPC response payload"); + return; + } // TODO the reply wrapper does lazy deserialization. Can we/should we ever defer? auto reply = wrapped_reply->getInternalMessage(); // TODO message stealing assert(reply); @@ -257,15 +270,29 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx) { - auto error_reply = req_ctx._originator_cmd->makeReply(); + auto& cmd = *req_ctx._originator_cmd; api::ReturnCode error; if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) { - mark_peer_without_direct_rpc_support(*req_ctx._originator_cmd->getAddress()); + mark_peer_without_direct_rpc_support(*cmd.getAddress()); error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported"); } else { error = map_frt_error_to_storage_api_error(req, req_ctx); } - LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str()); + create_and_dispatch_error_reply(cmd, std::move(error)); +} + +void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestContext& req_ctx, + vespalib::stringref description) { + auto& cmd = *req_ctx._originator_cmd; + assert(cmd.has_transport_context()); // Otherwise, reply already (destructively) generated by codec + create_and_dispatch_error_reply(cmd, api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::DECODE_ERROR), description)); +} + +void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) { + auto error_reply = cmd.makeReply(); + LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'", + cmd.getAddress()->toString().c_str(), error.toString().c_str()); error_reply->setResult(std::move(error)); // TODO needs tracing of received-event! _message_dispatcher.dispatch_sync(std::move(error_reply)); diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index c8152ebfdbd..39508e51841 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -60,6 +60,10 @@ public: void RPC_rpc_v1_send(FRT_RPCRequest* req); void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply); void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd); + + static constexpr const char* rpc_v1_method_name() noexcept { + return "storageapi.v1.send"; + } private: // TODO dedupe void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req); @@ -74,12 +78,15 @@ private: void register_server_methods(SharedRpcResources&); template <typename PayloadCodecCallback> - void uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback); + [[nodiscard]] bool uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback); template <typename MessageType> void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params); void RequestDone(FRT_RPCRequest* request) override; void handle_request_done_rpc_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx); + void handle_request_done_decode_error(const RpcRequestContext& req_ctx, + vespalib::stringref description); + void create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error); api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx); api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const; |