summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-10-02 14:20:14 +0200
committerGitHub <noreply@github.com>2020-10-02 14:20:14 +0200
commitcee18a8e0b1431949514dd4733b8dc728e5c56d6 (patch)
tree097acd5abd96b6b6ba7fbba54d1dbb343e7defcf
parent1a980f57d890cc67a3eb81aa5da92c42751a5201 (diff)
parentbeb68987952c1aab68d4146f9253cd27210c2e27 (diff)
Merge pull request #14660 from vespa-engine/vekterli/add-rpc-trace-events
Add StorageAPI RPC trace events
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp53
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h3
4 files changed, 60 insertions, 12 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 7fb33dc242b..8228ceb5e79 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) {
auto cmd = _node_0->create_dummy_put_command();
cmd->setAddress(non_existing_address());
+ cmd->getTrace().setLevel(9);
_node_0->send_request(cmd);
auto bounced_msg = _node_0->wait_and_receive_single_message();
@@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl
to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str());
EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg));
+ EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems"));
}
TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) {
@@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) {
// TODO also test bad response header/payload
+TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){
+ cmd.getTrace().setLevel(9);
+ });
+ auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd);
+ auto trace_str = recv_reply->getTrace().toString();
+ // Ordering of traced events matter, so we use a cheeky regex.
+ EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+"
+ "Request received at.+"
+ "Sending response from.+"
+ "Response received at"));
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 722d1fd3a81..740277218c3 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -48,6 +48,7 @@ public:
// Hostname of host node is running on.
[[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; }
+ [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; }
const RpcTargetFactory& target_factory() const;
private:
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index a8cdc0bfeea..8b5c7706510 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -16,12 +16,14 @@
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/util/compressor.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/trace/tracelevel.h>
#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".storage.storage_api_rpc_service");
using vespalib::compression::CompressionConfig;
+using vespalib::TraceLevel;
namespace storage::rpc {
@@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
scmd->getTrace().setLevel(hdr.trace_level());
scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms()));
req->DiscardBlobs();
+ if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ scmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ _rpc_resources.hostname().c_str(),
+ _rpc_resources.listen_port(),
+ uncompressed_size));
+ }
detach_and_forward_to_enqueuer(std::move(scmd), req);
} else {
req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload");
}
}
-void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) {
+void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) {
LOG(spam, "Server: encoding rpc.v1 response header and payload");
auto* ret = request.GetReturn();
+ if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str()));
+ }
// TODO skip encoding header altogether if no relevant fields set?
protobuf::ResponseHeader hdr;
if (reply.getTrace().getLevel() > 0) {
@@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
if (!target) {
auto reply = cmd->makeReply();
reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
+ if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage());
+ }
// Always dispatch async for synchronously generated replies, or we risk nuking the
// stack if the reply receiver keeps resending synchronously as well.
_message_dispatcher.dispatch_async(std::move(reply));
return;
}
+ if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds",
+ _rpc_resources.handle().c_str(),
+ CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(),
+ target->_spec.c_str(), vespalib::to_s(cmd->getTimeout())));
+ }
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
req->SetMethodName(rpc_v1_method_name());
@@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
+ auto& cmd = *req_ctx->_originator_cmd;
if (!req->CheckReturnTypes("bixbix")) {
handle_request_done_rpc_error(*req, *req_ctx);
return;
}
LOG(spam, "Client: received rpc.v1 OK response");
-
const auto& ret = *req->GetReturn();
protobuf::ResponseHeader hdr;
if (!decode_header_from_rpc_params(ret, hdr)) {
@@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
return;
}
std::unique_ptr<mbusprot::StorageReply> wrapped_reply;
- bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
+ uint32_t uncompressed_size = 0;
+ bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) {
wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd);
+ uncompressed_size = payload.size();
});
if (!ok) {
assert(!wrapped_reply);
@@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
assert(reply);
if (!hdr.trace_payload().empty()) {
- req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ }
+ if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Response received at '%s' with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ uncompressed_size));
}
- reply->getTrace().swap(req_ctx->_originator_cmd->getTrace());
+ reply->getTrace().swap(cmd.getTrace());
+ reply->setApproxByteSize(uncompressed_size);
// TODO ensure that no implicit long-lived refs end up pointing into RPC memory...!
req->DiscardBlobs();
@@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont
void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) {
auto error_reply = cmd.makeReply();
- LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'",
+ LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'",
cmd.getAddress()->toString().c_str(), error.toString().c_str());
+ error_reply->getTrace().swap(cmd.getTrace());
+ if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage());
+ }
error_reply->setResult(std::move(error));
- // TODO needs tracing of received-event!
_message_dispatcher.dispatch_sync(std::move(error_reply));
}
@@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc(
/*
* Major TODOs:
- * - tracing and trace propagation
- * - forwards/backwards compatibility
- * - what to remap bounced Not Found errors to internally?
* - lifetime semantics of FRT targets vs requests created from them?
* - lifetime of document type/fieldset repos vs messages
* - is repo ref squirreled away into the messages anywhere?
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 39508e51841..cb2344ccd13 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -58,14 +58,13 @@ public:
[[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
void RPC_rpc_v1_send(FRT_RPCRequest* req);
- void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
+ void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply);
void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
static constexpr const char* rpc_v1_method_name() noexcept {
return "storageapi.v1.send";
}
private:
- // TODO dedupe
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
struct RpcRequestContext {