summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-16 09:43:57 +0200
committerGitHub <noreply@github.com>2020-09-16 09:43:57 +0200
commita6b848e49d1aef3758b923f858186ba254ba4ec7 (patch)
treef6a1f42c8cd7021a7f16eaf8654856585709e322
parentc95320e171e0a36f8a1f5886dc131bddd7970020 (diff)
parentf635509f5060bd919f6fcd9c2dce7a65ce321f6c (diff)
Merge pull request #14416 from vespa-engine/vekterli/extend-rpc-tests-and-improve-error-messages
Extend RPC tests and improve auto-generated error messages
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp112
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/.gitignore2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp74
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h10
7 files changed, 164 insertions, 42 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 01022895c0e..6b1bad11fa7 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -16,8 +16,12 @@
#include <vespa/storageapi/message/persistence.h>
#include <tests/common/testhelper.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/host_name.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <gmock/gmock.h>
#include <condition_variable>
#include <deque>
+#include <functional>
#include <memory>
#include <mutex>
#include <stdexcept>
@@ -34,6 +38,7 @@ namespace storage::rpc {
namespace {
constexpr std::chrono::duration message_timeout = 60s;
+constexpr std::chrono::duration slobrok_register_timeout = 60s;
class LockingMockOperationDispatcher : public MessageDispatcher {
using MessageQueueType = std::deque<std::shared_ptr<api::StorageMessage>>;
@@ -125,8 +130,12 @@ public:
}
void wait_until_visible_in_slobrok(vespalib::stringref id) {
+ const auto deadline = std::chrono::steady_clock::now() + slobrok_register_timeout;
while (_shared_rpc_resources->slobrok_mirror().lookup(id).empty()) {
- std::this_thread::sleep_for(10ms); // TODO timeout handling
+ if (std::chrono::steady_clock::now() > deadline) {
+ throw std::runtime_error("Timed out waiting for node to be visible in Slobrok");
+ }
+ std::this_thread::sleep_for(10ms);
}
}
@@ -139,10 +148,18 @@ public:
return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100);
}
+ void send_request_verify_not_bounced(std::shared_ptr<api::StorageCommand> req) {
+ if (!_messages.empty()) {
+ throw std::runtime_error("Node had pending messages before send");
+ }
+ _service->send_rpc_v1_request(std::move(req));
+ if (!_messages.empty()) {
+ throw std::runtime_error("RPC request was bounced. Most likely due to missing Slobrok mapping");
+ }
+ }
+
void send_request(std::shared_ptr<api::StorageCommand> req) {
- ASSERT_TRUE(_messages.empty());
_service->send_rpc_v1_request(std::move(req));
- ASSERT_TRUE(_messages.empty()); // If non-empty, request was bounced (Slobrok lookup failed)
}
// TODO move StorageTransportContext away from communicationmanager.h
@@ -179,6 +196,44 @@ struct StorageApiRpcServiceTest : Test {
_node_1->wait_until_visible_in_slobrok(to_slobrok_id(_node_0->node_address()));
}
~StorageApiRpcServiceTest() override;
+
+ static api::StorageMessageAddress non_existing_address() {
+ return make_address(100, false);
+ }
+
+ [[nodiscard]] std::shared_ptr<api::PutCommand> send_and_receive_put_command_at_node_1(
+ const std::function<void(api::PutCommand&)>& req_mutator) {
+ auto cmd = _node_0->create_dummy_put_command();
+ cmd->setAddress(_node_1->node_address());
+ req_mutator(*cmd);
+ _node_0->send_request_verify_not_bounced(cmd);
+
+ auto recv_msg = _node_1->wait_and_receive_single_message();
+ auto recv_as_put = std::dynamic_pointer_cast<api::PutCommand>(recv_msg);
+ assert(recv_as_put);
+ return recv_as_put;
+ }
+ [[nodiscard]] std::shared_ptr<api::PutCommand> send_and_receive_put_command_at_node_1() {
+ return send_and_receive_put_command_at_node_1([]([[maybe_unused]] auto& cmd){});
+ }
+
+ [[nodiscard]] std::shared_ptr<api::PutReply> respond_and_receive_put_reply_at_node_0(
+ const std::shared_ptr<api::PutCommand>& cmd,
+ const std::function<void(api::StorageReply&)>& reply_mutator) {
+ auto reply = std::shared_ptr<api::StorageReply>(cmd->makeReply());
+ reply_mutator(*reply);
+ _node_1->send_response(reply);
+
+ auto recv_reply = _node_0->wait_and_receive_single_message();
+ auto recv_as_put_reply = std::dynamic_pointer_cast<api::PutReply>(recv_reply);
+ assert(recv_as_put_reply);
+ return recv_as_put_reply;
+ }
+
+ [[nodiscard]] std::shared_ptr<api::PutReply> respond_and_receive_put_reply_at_node_0(
+ const std::shared_ptr<api::PutCommand>& cmd) {
+ return respond_and_receive_put_reply_at_node_0(cmd, []([[maybe_unused]] auto& reply){});
+ }
};
StorageApiRpcServiceTest::~StorageApiRpcServiceTest() = default;
@@ -186,7 +241,7 @@ StorageApiRpcServiceTest::~StorageApiRpcServiceTest() = default;
TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
auto cmd = _node_0->create_dummy_put_command();
cmd->setAddress(_node_1->node_address());
- ASSERT_NO_FATAL_FAILURE(_node_0->send_request(cmd));
+ _node_0->send_request_verify_not_bounced(cmd);
auto recv_msg = _node_1->wait_and_receive_single_message();
auto* put_cmd = dynamic_cast<api::PutCommand*>(recv_msg.get());
@@ -199,4 +254,53 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
ASSERT_TRUE(put_reply != nullptr);
}
+TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) {
+ auto cmd = _node_0->create_dummy_put_command();
+ cmd->setAddress(non_existing_address());
+ _node_0->send_request(cmd);
+
+ auto bounced_msg = _node_0->wait_and_receive_single_message();
+ auto* put_reply = dynamic_cast<api::PutReply*>(bounced_msg.get());
+ ASSERT_TRUE(put_reply != nullptr);
+
+ auto expected_code = static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE);
+ auto expected_msg = vespalib::make_string(
+ "The address of service '%s' could not be resolved. It is not currently "
+ "registered with the Vespa name server. "
+ "The service must be having problems, or the routing configuration is wrong. "
+ "Address resolution attempted from host '%s'",
+ to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str());
+
+ EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg));
+}
+
+TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){
+ cmd.getTrace().setLevel(7);
+ cmd.setTimeout(1337s);
+ });
+ EXPECT_EQ(recv_cmd->getTrace().getLevel(), 7);
+ EXPECT_EQ(recv_cmd->getTimeout(), 1337s);
+}
+
+TEST_F(StorageApiRpcServiceTest, response_trace_is_propagated_to_sender) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){
+ cmd.getTrace().setLevel(1);
+ });
+ auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd, [](auto& reply){
+ reply.getTrace().trace(1, "Doing cool things", false);
+ });
+ auto trace_str = recv_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("Doing cool things"));
+}
+
+TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_set) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1();
+ auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd, [](auto& reply){
+ reply.getTrace().trace(1, "Doing cool things", false);
+ });
+ auto trace_str = recv_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things")));
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/.gitignore b/storage/src/vespa/storage/storageserver/rpc/.gitignore
index 3221e7351af..d3594ec97d6 100644
--- a/storage/src/vespa/storage/storageserver/rpc/.gitignore
+++ b/storage/src/vespa/storage/storageserver/rpc/.gitignore
@@ -1,2 +1,2 @@
*.pb.h
-*.pb.cc \ No newline at end of file
+*.pb.cc
diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
index be0e83e89cc..21498d66781 100644
--- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
@@ -22,4 +22,4 @@ vespa_add_library(storage_storageserver_rpc OBJECT
storage_api_rpc_service.cpp
${storage_storageserver_rpc_PROTOBUF_SRCS}
DEPENDS
-) \ No newline at end of file
+)
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 2c1989767e8..d21f32aa623 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -8,6 +8,7 @@
#include <vespa/slobrok/sbregister.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <cassert>
#include <chrono>
@@ -66,6 +67,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
_slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, config_uri)),
_slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config_uri)),
_target_factory(std::make_unique<RpcTargetFactoryImpl>(*_orb)),
+ _hostname(vespalib::HostName::get()),
_rpc_server_port(rpc_server_port),
_shutdown(false)
{
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index bd682612d20..722d1fd3a81 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -25,6 +25,7 @@ class SharedRpcResources {
std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register;
std::unique_ptr<slobrok::api::MirrorAPI> _slobrok_mirror;
std::unique_ptr<RpcTargetFactoryImpl> _target_factory;
+ vespalib::string _hostname;
vespalib::string _handle;
int _rpc_server_port;
bool _shutdown;
@@ -45,6 +46,9 @@ public:
void shutdown();
[[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started
+ // Hostname of host node is running on.
+ [[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; }
+
const RpcTargetFactory& target_factory() const;
private:
void wait_until_slobrok_is_ready();
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 5e974eb4cce..acc5472a9a9 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -187,11 +187,8 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
assert(cmd->getAddress() != nullptr);
auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress());
if (!target) {
- // TODO need to bounce reply with TODO descriptive error code, mirroring that of RPCNetwork!
auto reply = cmd->makeReply();
- reply->setResult(api::ReturnCode(
- static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE),
- "Couldn't find the darn thing in Slobrok")); // TODO :D
+ reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
// Always dispatch async for synchronously generated replies, or we risk nuking the
// stack if the reply receiver keeps resending synchronously as well.
_message_dispatcher.dispatch_async(std::move(reply));
@@ -210,43 +207,17 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
const auto timeout = cmd->getTimeout();
// TODO verify it's fine that we alloc this on the request stash and use it this way
- auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd), timeout);
+ auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd));
req->SetContext(FNET_Context(&req_ctx));
target->_target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
}
-namespace {
-
-api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, vespalib::duration timeout) {
- // TODO determine all codes that must be (re)mapped. Current remapping adapted from RPCSend
- // TODO need to keep enough state to give peer ID/spec in error messages
- switch (req.GetErrorCode()) {
- case FRTE_RPC_TIMEOUT:
- return api::ReturnCode(
- static_cast<api::ReturnCode::Result>(mbus::ErrorCode::TIMEOUT),
- vespalib::make_string("A timeout occurred while waiting for '%s' (%g seconds expired); %s",
- "TODO", vespalib::to_s(timeout), req.GetErrorMessage()));
- case FRTE_RPC_CONNECTION:
- return api::ReturnCode(
- static_cast<api::ReturnCode::Result>(mbus::ErrorCode::CONNECTION_ERROR),
- vespalib::make_string("A connection error occurred for '%s'; %s",
- "TODO", req.GetErrorMessage()));
- default:
- return api::ReturnCode(
- static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NETWORK_ERROR),
- vespalib::make_string("A network error occurred for '%s'; %s",
- "TODO", req.GetErrorMessage()));
- }
-}
-
-}
-
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
if (!req->CheckReturnTypes("bixbix")) {
- api::ReturnCode error = map_frt_error_to_storage_api_error(*req, req_ctx->_timeout);
+ api::ReturnCode error = map_frt_error_to_storage_api_error(*req, *req_ctx);
LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str());
auto error_reply = req_ctx->_originator_cmd->makeReply();
error_reply->setResult(std::move(error));
@@ -280,10 +251,49 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
_message_dispatcher.dispatch_sync(std::move(reply));
}
+api::ReturnCode
+StorageApiRpcService::map_frt_error_to_storage_api_error(FRT_RPCRequest& req,
+ const RpcRequestContext& req_ctx) {
+ // TODO determine all codes that must be (re)mapped. Current remapping is adapted from RPCSend
+ const auto& cmd = *req_ctx._originator_cmd;
+ auto target_service = CachingRpcTargetResolver::address_to_slobrok_id(*cmd.getAddress());
+ switch (req.GetErrorCode()) {
+ case FRTE_RPC_TIMEOUT:
+ return api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::TIMEOUT),
+ vespalib::make_string("A timeout occurred while waiting for '%s' (%g seconds expired); %s",
+ target_service.c_str(), vespalib::to_s(cmd.getTimeout()), req.GetErrorMessage()));
+ case FRTE_RPC_CONNECTION:
+ return api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::CONNECTION_ERROR),
+ vespalib::make_string("A connection error occurred for '%s'; %s",
+ target_service.c_str(), req.GetErrorMessage()));
+ default:
+ return api::ReturnCode(
+ static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NETWORK_ERROR),
+ vespalib::make_string("A network error occurred for '%s'; %s",
+ target_service.c_str(), req.GetErrorMessage()));
+ }
+}
+
+api::ReturnCode
+StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessageAddress& addr) const {
+ auto error_code = static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE);
+ auto error_msg = vespalib::make_string(
+ "The address of service '%s' could not be resolved. It is not currently "
+ "registered with the Vespa name server. "
+ "The service must be having problems, or the routing configuration is wrong. "
+ "Address resolution attempted from host '%s'",
+ CachingRpcTargetResolver::address_to_slobrok_id(addr).c_str(),
+ _rpc_resources.hostname().c_str());
+ return api::ReturnCode(error_code, std::move(error_msg));
+}
+
/*
* Major TODOs:
* - tracing and trace propagation
* - forwards/backwards compatibility
+ * - what to remap bounced Not Found errors to internally?
* - lifetime semantics of FRT targets vs requests created from them?
* - lifetime of document type/fieldset repos vs messages
* - is repo ref squirreled away into the messages anywhere?
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 7ffa111fa60..4385228bb5c 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -4,6 +4,7 @@
#include "rpc_target.h"
#include <vespa/fnet/frt/invokable.h>
#include <vespa/fnet/frt/invoker.h>
+#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/vespalib/stllike/string.h>
#include <atomic>
#include <memory>
@@ -51,11 +52,9 @@ private:
struct RpcRequestContext {
std::shared_ptr<api::StorageCommand> _originator_cmd;
- std::chrono::nanoseconds _timeout;
- RpcRequestContext(std::shared_ptr<api::StorageCommand> cmd, std::chrono::nanoseconds timeout)
- : _originator_cmd(std::move(cmd)),
- _timeout(timeout)
+ explicit RpcRequestContext(std::shared_ptr<api::StorageCommand> cmd)
+ : _originator_cmd(std::move(cmd))
{}
};
@@ -65,6 +64,9 @@ private:
template <typename MessageType>
void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params);
void RequestDone(FRT_RPCRequest* request) override;
+
+ api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
+ api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const;
};
} // rpc