diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-10-02 14:20:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-02 14:20:14 +0200 |
commit | cee18a8e0b1431949514dd4733b8dc728e5c56d6 (patch) | |
tree | 097acd5abd96b6b6ba7fbba54d1dbb343e7defcf | |
parent | 1a980f57d890cc67a3eb81aa5da92c42751a5201 (diff) | |
parent | beb68987952c1aab68d4146f9253cd27210c2e27 (diff) |
Merge pull request #14660 from vespa-engine/vekterli/add-rpc-trace-events
Add StorageAPI RPC trace events
4 files changed, 60 insertions, 12 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 7fb33dc242b..8228ceb5e79 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) { TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) { auto cmd = _node_0->create_dummy_put_command(); cmd->setAddress(non_existing_address()); + cmd->getTrace().setLevel(9); _node_0->send_request(cmd); auto bounced_msg = _node_0->wait_and_receive_single_message(); @@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str()); EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg)); + EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems")); } TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) { @@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) { // TODO also test bad response header/payload +TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) { + auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){ + cmd.getTrace().setLevel(9); + }); + auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd); + auto trace_str = recv_reply->getTrace().toString(); + // Ordering of traced events matter, so we use a cheeky regex. + EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+" + "Request received at.+" + "Sending response from.+" + "Response received at")); +} + } diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index 722d1fd3a81..740277218c3 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -48,6 +48,7 @@ public: // Hostname of host node is running on. [[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; } + [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; } const RpcTargetFactory& target_factory() const; private: diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index a8cdc0bfeea..8b5c7706510 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -16,12 +16,14 @@ #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/util/compressor.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/trace/tracelevel.h> #include <cassert> #include <vespa/log/log.h> LOG_SETUP(".storage.storage_api_rpc_service"); using vespalib::compression::CompressionConfig; +using vespalib::TraceLevel; namespace storage::rpc { @@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { scmd->getTrace().setLevel(hdr.trace_level()); scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms())); req->DiscardBlobs(); + if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + scmd->getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload", + _rpc_resources.handle().c_str(), + _rpc_resources.hostname().c_str(), + _rpc_resources.listen_port(), + uncompressed_size)); + } detach_and_forward_to_enqueuer(std::move(scmd), req); } else { req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload"); } } -void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) { +void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) { LOG(spam, "Server: encoding rpc.v1 response header and payload"); auto* ret = request.GetReturn(); + if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + reply.getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str())); + } // TODO skip encoding header altogether if no relevant fields set? protobuf::ResponseHeader hdr; if (reply.getTrace().getLevel() > 0) { @@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma if (!target) { auto reply = cmd->makeReply(); reply->setResult(make_no_address_for_service_error(*cmd->getAddress())); + if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) { + reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage()); + } // Always dispatch async for synchronously generated replies, or we risk nuking the // stack if the reply receiver keeps resending synchronously as well. _message_dispatcher.dispatch_async(std::move(reply)); return; } + if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + cmd->getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds", + _rpc_resources.handle().c_str(), + CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(), + target->_spec.c_str(), vespalib::to_s(cmd->getTimeout()))); + } std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest()); req->SetMethodName(rpc_v1_method_name()); @@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req); auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP); + auto& cmd = *req_ctx->_originator_cmd; if (!req->CheckReturnTypes("bixbix")) { handle_request_done_rpc_error(*req, *req_ctx); return; } LOG(spam, "Client: received rpc.v1 OK response"); - const auto& ret = *req->GetReturn(); protobuf::ResponseHeader hdr; if (!decode_header_from_rpc_params(ret, hdr)) { @@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { return; } std::unique_ptr<mbusprot::StorageReply> wrapped_reply; - bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) { + uint32_t uncompressed_size = 0; + bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) { wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd); + uncompressed_size = payload.size(); }); if (!ok) { assert(!wrapped_reply); @@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { assert(reply); if (!hdr.trace_payload().empty()) { - req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + } + if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + cmd.getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Response received at '%s' with %u bytes of payload", + _rpc_resources.handle().c_str(), + uncompressed_size)); } - reply->getTrace().swap(req_ctx->_originator_cmd->getTrace()); + reply->getTrace().swap(cmd.getTrace()); + reply->setApproxByteSize(uncompressed_size); // TODO ensure that no implicit long-lived refs end up pointing into RPC memory...! req->DiscardBlobs(); @@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) { auto error_reply = cmd.makeReply(); - LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'", + LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'", cmd.getAddress()->toString().c_str(), error.toString().c_str()); + error_reply->getTrace().swap(cmd.getTrace()); + if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) { + error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage()); + } error_reply->setResult(std::move(error)); - // TODO needs tracing of received-event! _message_dispatcher.dispatch_sync(std::move(error_reply)); } @@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc( /* * Major TODOs: - * - tracing and trace propagation - * - forwards/backwards compatibility - * - what to remap bounced Not Found errors to internally? * - lifetime semantics of FRT targets vs requests created from them? * - lifetime of document type/fieldset repos vs messages * - is repo ref squirreled away into the messages anywhere? diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 39508e51841..cb2344ccd13 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -58,14 +58,13 @@ public: [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept; void RPC_rpc_v1_send(FRT_RPCRequest* req); - void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply); + void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply); void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd); static constexpr const char* rpc_v1_method_name() noexcept { return "storageapi.v1.send"; } private: - // TODO dedupe void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req); struct RpcRequestContext { |