summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-30 23:00:02 +0200
committerGitHub <noreply@github.com>2020-09-30 23:00:02 +0200
commitbbabb4d40ba374d0e34ac89d0352b8e564ef565d (patch)
tree999df147c83e25064600e635d2c53fba2f335fde /storage
parent4386de75fc03a9b568bd5e0632f336b3557833c2 (diff)
parent95a586154dacbfd9acdc604f8e63b282194743ea (diff)
Merge pull request #14642 from vespa-engine/vekterli/gracefully-handle-rpc-decode-failures
Gracefully handle RPC header/payload decode failures
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp49
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp61
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h9
4 files changed, 102 insertions, 19 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index ee70f265297..7fb33dc242b 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -4,6 +4,8 @@
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
@@ -126,6 +128,8 @@ public:
~RpcNode();
const api::StorageMessageAddress& node_address() const noexcept { return _node_address; }
+ const SharedRpcResources& shared_rpc_resources() const noexcept { return *_shared_rpc_resources; }
+ SharedRpcResources& shared_rpc_resources() noexcept { return *_shared_rpc_resources; }
void wait_until_visible_in_slobrok(vespalib::stringref id) {
const auto deadline = std::chrono::steady_clock::now() + slobrok_register_timeout;
@@ -194,6 +198,19 @@ public:
bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept {
return _service->target_supports_direct_rpc(addr);
}
+
+ void send_raw_request_and_expect_error(StorageApiNode& node,
+ FRT_RPCRequest* req,
+ const vespalib::string& expected_msg) {
+ auto spec = vespalib::make_string("tcp/localhost:%d", node.shared_rpc_resources().listen_port());
+ auto* target = _shared_rpc_resources->supervisor().GetTarget(spec.c_str());
+ target->InvokeSync(req, 60.0);
+ EXPECT_TRUE(req->IsError());
+ EXPECT_EQ(req->GetErrorCode(), FRTE_RPC_METHOD_FAILED);
+ EXPECT_EQ(req->GetErrorMessage(), expected_msg);
+ target->SubRef();
+ req->SubRef();
+ }
};
StorageApiNode::~StorageApiNode() {
@@ -359,4 +376,36 @@ TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_support
EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
}
+TEST_F(StorageApiRpcServiceTest, malformed_request_header_returns_rpc_error) {
+ auto& supervisor = _node_0->shared_rpc_resources().supervisor();
+ auto* req = supervisor.AllocRPCRequest();
+ req->SetMethodName(StorageApiRpcService::rpc_v1_method_name());
+ auto* params = req->GetParams();
+ params->AddInt8(0); // No compression
+ params->AddInt32(24);
+ strncpy(params->AddData(24), "some non protobuf stuff", 24);
+ params->AddInt8(0); // Still no compression
+ params->AddInt32(0); // Not actually valid, but we'll try to decode the header first.
+ params->AddData(0);
+
+ _node_0->send_raw_request_and_expect_error(*_node_1, req, "Unable to decode RPC request header protobuf");
+}
+
+TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) {
+ auto& supervisor = _node_0->shared_rpc_resources().supervisor();
+ auto* req = supervisor.AllocRPCRequest();
+ req->SetMethodName(StorageApiRpcService::rpc_v1_method_name());
+ auto* params = req->GetParams();
+ params->AddInt8(0); // No compression
+ params->AddInt32(0);
+ params->AddData(0); // This is a valid empty protobuf header with no fields set
+ params->AddInt8(0); // Even still no compression
+ params->AddInt32(0); // This, however, isn't valid, since at least sizeof(uint32_t) must be present
+ params->AddData(0);
+
+ _node_0->send_raw_request_and_expect_error(*_node_1, req, "Unable to decode RPC request payload");
+}
+
+// TODO also test bad response header/payload
+
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 667c577645a..86c987a15c5 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -638,7 +638,7 @@ CommunicationManager::sendDirectRPCReply(
{
std::string_view requestName(request.getMethodName()); // TODO non-name based dispatch
// TODO rework this entire dispatch mechanism :D
- if (requestName == "storageapi.v1.send") {
+ if (requestName == rpc::StorageApiRpcService::rpc_v1_method_name()) {
_storage_api_rpc_service->encode_rpc_v1_response(*request.raw_request(), *reply);
} else if (requestName == "getnodestate3") {
auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply));
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index c02f0c56092..a8cdc0bfeea 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -47,7 +47,7 @@ StorageApiRpcService::Params::~Params() = default;
void StorageApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) {
FRT_ReflectionBuilder rb(&rpc_resources.supervisor());
- rb.DefineMethod("storageapi.v1.send", "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this);
+ rb.DefineMethod(rpc_v1_method_name(), "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this);
rb.MethodDesc("V1 of StorageAPI direct RPC protocol");
rb.ParamDesc("header_encoding", "0=raw, 6=lz4");
rb.ParamDesc("header_decoded_size", "Uncompressed header blob size");
@@ -132,7 +132,7 @@ void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& ms
}
template <typename PayloadCodecCallback>
-void StorageApiRpcService::uncompress_rpc_payload(
+bool StorageApiRpcService::uncompress_rpc_payload(
const FRT_Values& params,
PayloadCodecCallback payload_callback)
{
@@ -146,25 +146,32 @@ void StorageApiRpcService::uncompress_rpc_payload(
assert(uncompressed_length <= UINT32_MAX);
auto wrapped_codec = _message_codec_provider.wrapped_codec();
- payload_callback(wrapped_codec->codec(), mbus::BlobRef(uncompressed.getData(), uncompressed_length));
+ try {
+ payload_callback(wrapped_codec->codec(), mbus::BlobRef(uncompressed.getData(), uncompressed_length));
+ } catch (std::exception& e) {
+ LOG(debug, "Caught exception during decode callback: '%s'", e.what());
+ return false;
+ }
+ return true;
}
void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
- LOG(debug, "Server: received rpc.v1 request");
+ LOG(spam, "Server: received rpc.v1 request");
// TODO do we need to manually check the parameter/return spec here?
const auto& params = *req->GetParams();
protobuf::RequestHeader hdr;
if (!decode_header_from_rpc_params(params, hdr)) {
- req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request header protobuf");
+ req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request header protobuf");
return;
}
std::unique_ptr<mbusprot::StorageCommand> cmd;
uint32_t uncompressed_size = 0;
- uncompress_rpc_payload(params, [&cmd, &uncompressed_size](auto& codec, auto payload) {
+ bool ok = uncompress_rpc_payload(params, [&cmd, &uncompressed_size](auto& codec, auto payload) {
cmd = codec.decodeCommand(payload);
uncompressed_size = static_cast<uint32_t>(payload.size());
});
- if (cmd && cmd->has_command()) {
+ if (ok) {
+ assert(cmd && cmd->has_command());
auto scmd = cmd->steal_command();
scmd->setApproxByteSize(uncompressed_size);
scmd->getTrace().setLevel(hdr.trace_level());
@@ -172,12 +179,12 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
req->DiscardBlobs();
detach_and_forward_to_enqueuer(std::move(scmd), req);
} else {
- req->SetError(FRTE_RPC_BAD_REQUEST, "Unable to decode RPC request payload");
+ req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload");
}
}
void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) {
- LOG(debug, "Server: encoding rpc.v1 response header and payload");
+ LOG(spam, "Server: encoding rpc.v1 response header and payload");
auto* ret = request.GetReturn();
// TODO skip encoding header altogether if no relevant fields set?
@@ -191,7 +198,8 @@ void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const
}
void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd) {
- LOG(debug, "Client: sending rpc.v1 request for message of type %s", cmd->getType().getName().c_str());
+ LOG(spam, "Client: sending rpc.v1 request for message of type %s to %s",
+ cmd->getType().getName().c_str(), cmd->getAddress()->toString().c_str());
assert(cmd->getAddress() != nullptr);
auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress());
@@ -204,7 +212,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
return;
}
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
- req->SetMethodName("storageapi.v1.send");
+ req->SetMethodName(rpc_v1_method_name());
protobuf::RequestHeader req_hdr;
req_hdr.set_time_remaining_ms(std::chrono::duration_cast<std::chrono::milliseconds>(cmd->getTimeout()).count());
@@ -229,18 +237,23 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
handle_request_done_rpc_error(*req, *req_ctx);
return;
}
- LOG(debug, "Client: received rpc.v1 OK response");
+ LOG(spam, "Client: received rpc.v1 OK response");
const auto& ret = *req->GetReturn();
protobuf::ResponseHeader hdr;
if (!decode_header_from_rpc_params(ret, hdr)) {
- assert(false); // TODO generate error reply
+ handle_request_done_decode_error(*req_ctx, "Failed to decode RPC response header protobuf");
return;
}
std::unique_ptr<mbusprot::StorageReply> wrapped_reply;
- uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
+ bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd);
});
+ if (!ok) {
+ assert(!wrapped_reply);
+ handle_request_done_decode_error(*req_ctx, "Failed to decode RPC response payload");
+ return;
+ }
// TODO the reply wrapper does lazy deserialization. Can we/should we ever defer?
auto reply = wrapped_reply->getInternalMessage(); // TODO message stealing
assert(reply);
@@ -257,15 +270,29 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req,
const RpcRequestContext& req_ctx) {
- auto error_reply = req_ctx._originator_cmd->makeReply();
+ auto& cmd = *req_ctx._originator_cmd;
api::ReturnCode error;
if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) {
- mark_peer_without_direct_rpc_support(*req_ctx._originator_cmd->getAddress());
+ mark_peer_without_direct_rpc_support(*cmd.getAddress());
error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported");
} else {
error = map_frt_error_to_storage_api_error(req, req_ctx);
}
- LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str());
+ create_and_dispatch_error_reply(cmd, std::move(error));
+}
+
+void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestContext& req_ctx,
+ vespalib::stringref description) {
+ auto& cmd = *req_ctx._originator_cmd;
+ assert(cmd.has_transport_context()); // Otherwise, reply already (destructively) generated by codec
+ create_and_dispatch_error_reply(cmd, api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::DECODE_ERROR), description));
+}
+
+void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) {
+ auto error_reply = cmd.makeReply();
+ LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'",
+ cmd.getAddress()->toString().c_str(), error.toString().c_str());
error_reply->setResult(std::move(error));
// TODO needs tracing of received-event!
_message_dispatcher.dispatch_sync(std::move(error_reply));
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index c8152ebfdbd..39508e51841 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -60,6 +60,10 @@ public:
void RPC_rpc_v1_send(FRT_RPCRequest* req);
void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
+
+ static constexpr const char* rpc_v1_method_name() noexcept {
+ return "storageapi.v1.send";
+ }
private:
// TODO dedupe
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
@@ -74,12 +78,15 @@ private:
void register_server_methods(SharedRpcResources&);
template <typename PayloadCodecCallback>
- void uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback);
+ [[nodiscard]] bool uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback);
template <typename MessageType>
void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params);
void RequestDone(FRT_RPCRequest* request) override;
void handle_request_done_rpc_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
+ void handle_request_done_decode_error(const RpcRequestContext& req_ctx,
+ vespalib::stringref description);
+ void create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error);
api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const;