aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-10-01 14:22:37 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-10-01 14:22:37 +0000
commitbeb68987952c1aab68d4146f9253cd27210c2e27 (patch)
tree3da6ade441a3cd197b704236838dbc81aafcc007 /storage
parent04fc2f93d585562f4759fb070957ff44684f15f4 (diff)
Add StorageAPI RPC trace events
Trace event set should roughly match that of MessageBus, but contains more information (such as hostnames and payload size) for debugging purposes.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp53
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h3
4 files changed, 60 insertions, 12 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 7fb33dc242b..8228ceb5e79 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) {
auto cmd = _node_0->create_dummy_put_command();
cmd->setAddress(non_existing_address());
+ cmd->getTrace().setLevel(9);
_node_0->send_request(cmd);
auto bounced_msg = _node_0->wait_and_receive_single_message();
@@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl
to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str());
EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg));
+ EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems"));
}
TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) {
@@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) {
// TODO also test bad response header/payload
+TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){
+ cmd.getTrace().setLevel(9);
+ });
+ auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd);
+ auto trace_str = recv_reply->getTrace().toString();
+ // Ordering of traced events matter, so we use a cheeky regex.
+ EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+"
+ "Request received at.+"
+ "Sending response from.+"
+ "Response received at"));
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 722d1fd3a81..740277218c3 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -48,6 +48,7 @@ public:
// Hostname of host node is running on.
[[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; }
+ [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; }
const RpcTargetFactory& target_factory() const;
private:
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index a8cdc0bfeea..8b5c7706510 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -16,12 +16,14 @@
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/util/compressor.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/trace/tracelevel.h>
#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".storage.storage_api_rpc_service");
using vespalib::compression::CompressionConfig;
+using vespalib::TraceLevel;
namespace storage::rpc {
@@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
scmd->getTrace().setLevel(hdr.trace_level());
scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms()));
req->DiscardBlobs();
+ if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ scmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ _rpc_resources.hostname().c_str(),
+ _rpc_resources.listen_port(),
+ uncompressed_size));
+ }
detach_and_forward_to_enqueuer(std::move(scmd), req);
} else {
req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload");
}
}
-void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) {
+void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) {
LOG(spam, "Server: encoding rpc.v1 response header and payload");
auto* ret = request.GetReturn();
+ if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str()));
+ }
// TODO skip encoding header altogether if no relevant fields set?
protobuf::ResponseHeader hdr;
if (reply.getTrace().getLevel() > 0) {
@@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
if (!target) {
auto reply = cmd->makeReply();
reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
+ if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage());
+ }
// Always dispatch async for synchronously generated replies, or we risk nuking the
// stack if the reply receiver keeps resending synchronously as well.
_message_dispatcher.dispatch_async(std::move(reply));
return;
}
+ if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds",
+ _rpc_resources.handle().c_str(),
+ CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(),
+ target->_spec.c_str(), vespalib::to_s(cmd->getTimeout())));
+ }
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
req->SetMethodName(rpc_v1_method_name());
@@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
+ auto& cmd = *req_ctx->_originator_cmd;
if (!req->CheckReturnTypes("bixbix")) {
handle_request_done_rpc_error(*req, *req_ctx);
return;
}
LOG(spam, "Client: received rpc.v1 OK response");
-
const auto& ret = *req->GetReturn();
protobuf::ResponseHeader hdr;
if (!decode_header_from_rpc_params(ret, hdr)) {
@@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
return;
}
std::unique_ptr<mbusprot::StorageReply> wrapped_reply;
- bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
+ uint32_t uncompressed_size = 0;
+ bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) {
wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd);
+ uncompressed_size = payload.size();
});
if (!ok) {
assert(!wrapped_reply);
@@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
assert(reply);
if (!hdr.trace_payload().empty()) {
- req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ }
+ if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Response received at '%s' with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ uncompressed_size));
}
- reply->getTrace().swap(req_ctx->_originator_cmd->getTrace());
+ reply->getTrace().swap(cmd.getTrace());
+ reply->setApproxByteSize(uncompressed_size);
// TODO ensure that no implicit long-lived refs end up pointing into RPC memory...!
req->DiscardBlobs();
@@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont
void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) {
auto error_reply = cmd.makeReply();
- LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'",
+ LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'",
cmd.getAddress()->toString().c_str(), error.toString().c_str());
+ error_reply->getTrace().swap(cmd.getTrace());
+ if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage());
+ }
error_reply->setResult(std::move(error));
- // TODO needs tracing of received-event!
_message_dispatcher.dispatch_sync(std::move(error_reply));
}
@@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc(
/*
* Major TODOs:
- * - tracing and trace propagation
- * - forwards/backwards compatibility
- * - what to remap bounced Not Found errors to internally?
* - lifetime semantics of FRT targets vs requests created from them?
* - lifetime of document type/fieldset repos vs messages
* - is repo ref squirreled away into the messages anywhere?
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 39508e51841..cb2344ccd13 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -58,14 +58,13 @@ public:
[[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
void RPC_rpc_v1_send(FRT_RPCRequest* req);
- void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
+ void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply);
void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
static constexpr const char* rpc_v1_method_name() noexcept {
return "storageapi.v1.send";
}
private:
- // TODO dedupe
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
struct RpcRequestContext {