diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-15 13:35:07 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-15 13:35:07 +0000 |
commit | f635509f5060bd919f6fcd9c2dce7a65ce321f6c (patch) | |
tree | 92276e64e8add438858fa663ccfca065fd96132e /storage | |
parent | 2d8032d1351fd620ad80fe55890eee328dfb0802 (diff) |
Extend RPC tests and improve auto-generated error messages
The improved messages should now match what MessageBus generates.
Diffstat (limited to 'storage')
7 files changed, 164 insertions, 42 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 01022895c0e..6b1bad11fa7 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -16,8 +16,12 @@ #include <vespa/storageapi/message/persistence.h> #include <tests/common/testhelper.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/host_name.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <gmock/gmock.h> #include <condition_variable> #include <deque> +#include <functional> #include <memory> #include <mutex> #include <stdexcept> @@ -34,6 +38,7 @@ namespace storage::rpc { namespace { constexpr std::chrono::duration message_timeout = 60s; +constexpr std::chrono::duration slobrok_register_timeout = 60s; class LockingMockOperationDispatcher : public MessageDispatcher { using MessageQueueType = std::deque<std::shared_ptr<api::StorageMessage>>; @@ -125,8 +130,12 @@ public: } void wait_until_visible_in_slobrok(vespalib::stringref id) { + const auto deadline = std::chrono::steady_clock::now() + slobrok_register_timeout; while (_shared_rpc_resources->slobrok_mirror().lookup(id).empty()) { - std::this_thread::sleep_for(10ms); // TODO timeout handling + if (std::chrono::steady_clock::now() > deadline) { + throw std::runtime_error("Timed out waiting for node to be visible in Slobrok"); + } + std::this_thread::sleep_for(10ms); } } @@ -139,10 +148,18 @@ public: return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100); } + void send_request_verify_not_bounced(std::shared_ptr<api::StorageCommand> req) { + if (!_messages.empty()) { + throw std::runtime_error("Node had pending messages before send"); + } + _service->send_rpc_v1_request(std::move(req)); + if (!_messages.empty()) { + throw std::runtime_error("RPC request was bounced. Most likely due to missing Slobrok mapping"); + } + } + void send_request(std::shared_ptr<api::StorageCommand> req) { - ASSERT_TRUE(_messages.empty()); _service->send_rpc_v1_request(std::move(req)); - ASSERT_TRUE(_messages.empty()); // If non-empty, request was bounced (Slobrok lookup failed) } // TODO move StorageTransportContext away from communicationmanager.h @@ -179,6 +196,44 @@ struct StorageApiRpcServiceTest : Test { _node_1->wait_until_visible_in_slobrok(to_slobrok_id(_node_0->node_address())); } ~StorageApiRpcServiceTest() override; + + static api::StorageMessageAddress non_existing_address() { + return make_address(100, false); + } + + [[nodiscard]] std::shared_ptr<api::PutCommand> send_and_receive_put_command_at_node_1( + const std::function<void(api::PutCommand&)>& req_mutator) { + auto cmd = _node_0->create_dummy_put_command(); + cmd->setAddress(_node_1->node_address()); + req_mutator(*cmd); + _node_0->send_request_verify_not_bounced(cmd); + + auto recv_msg = _node_1->wait_and_receive_single_message(); + auto recv_as_put = std::dynamic_pointer_cast<api::PutCommand>(recv_msg); + assert(recv_as_put); + return recv_as_put; + } + [[nodiscard]] std::shared_ptr<api::PutCommand> send_and_receive_put_command_at_node_1() { + return send_and_receive_put_command_at_node_1([]([[maybe_unused]] auto& cmd){}); + } + + [[nodiscard]] std::shared_ptr<api::PutReply> respond_and_receive_put_reply_at_node_0( + const std::shared_ptr<api::PutCommand>& cmd, + const std::function<void(api::StorageReply&)>& reply_mutator) { + auto reply = std::shared_ptr<api::StorageReply>(cmd->makeReply()); + reply_mutator(*reply); + _node_1->send_response(reply); + + auto recv_reply = _node_0->wait_and_receive_single_message(); + auto recv_as_put_reply = std::dynamic_pointer_cast<api::PutReply>(recv_reply); + assert(recv_as_put_reply); + return recv_as_put_reply; + } + + [[nodiscard]] std::shared_ptr<api::PutReply> respond_and_receive_put_reply_at_node_0( + const std::shared_ptr<api::PutCommand>& cmd) { + return respond_and_receive_put_reply_at_node_0(cmd, []([[maybe_unused]] auto& reply){}); + } }; StorageApiRpcServiceTest::~StorageApiRpcServiceTest() = default; @@ -186,7 +241,7 @@ StorageApiRpcServiceTest::~StorageApiRpcServiceTest() = default; TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) { auto cmd = _node_0->create_dummy_put_command(); cmd->setAddress(_node_1->node_address()); - ASSERT_NO_FATAL_FAILURE(_node_0->send_request(cmd)); + _node_0->send_request_verify_not_bounced(cmd); auto recv_msg = _node_1->wait_and_receive_single_message(); auto* put_cmd = dynamic_cast<api::PutCommand*>(recv_msg.get()); @@ -199,4 +254,53 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) { ASSERT_TRUE(put_reply != nullptr); } +TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) { + auto cmd = _node_0->create_dummy_put_command(); + cmd->setAddress(non_existing_address()); + _node_0->send_request(cmd); + + auto bounced_msg = _node_0->wait_and_receive_single_message(); + auto* put_reply = dynamic_cast<api::PutReply*>(bounced_msg.get()); + ASSERT_TRUE(put_reply != nullptr); + + auto expected_code = static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE); + auto expected_msg = vespalib::make_string( + "The address of service '%s' could not be resolved. It is not currently " + "registered with the Vespa name server. " + "The service must be having problems, or the routing configuration is wrong. " + "Address resolution attempted from host '%s'", + to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str()); + + EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg)); +} + +TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) { + auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){ + cmd.getTrace().setLevel(7); + cmd.setTimeout(1337s); + }); + EXPECT_EQ(recv_cmd->getTrace().getLevel(), 7); + EXPECT_EQ(recv_cmd->getTimeout(), 1337s); +} + +TEST_F(StorageApiRpcServiceTest, response_trace_is_propagated_to_sender) { + auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){ + cmd.getTrace().setLevel(1); + }); + auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd, [](auto& reply){ + reply.getTrace().trace(1, "Doing cool things", false); + }); + auto trace_str = recv_reply->getTrace().toString(); + EXPECT_THAT(trace_str, HasSubstr("Doing cool things")); +} + +TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_set) { + auto recv_cmd = send_and_receive_put_command_at_node_1(); + auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd, [](auto& reply){ + reply.getTrace().trace(1, "Doing cool things", false); + }); + auto trace_str = recv_reply->getTrace().toString(); + EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things"))); +} + } diff --git a/storage/src/vespa/storage/storageserver/rpc/.gitignore b/storage/src/vespa/storage/storageserver/rpc/.gitignore index 3221e7351af..d3594ec97d6 100644 --- a/storage/src/vespa/storage/storageserver/rpc/.gitignore +++ b/storage/src/vespa/storage/storageserver/rpc/.gitignore @@ -1,2 +1,2 @@ *.pb.h -*.pb.cc
\ No newline at end of file +*.pb.cc diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt index be0e83e89cc..21498d66781 100644 --- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt @@ -22,4 +22,4 @@ vespa_add_library(storage_storageserver_rpc OBJECT storage_api_rpc_service.cpp ${storage_storageserver_rpc_PROTOBUF_SRCS} DEPENDS -)
\ No newline at end of file +) diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 2c1989767e8..d21f32aa623 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -8,6 +8,7 @@ #include <vespa/slobrok/sbregister.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/stringfmt.h> #include <cassert> #include <chrono> @@ -66,6 +67,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, _slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, config_uri)), _slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config_uri)), _target_factory(std::make_unique<RpcTargetFactoryImpl>(*_orb)), + _hostname(vespalib::HostName::get()), _rpc_server_port(rpc_server_port), _shutdown(false) { diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index bd682612d20..722d1fd3a81 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -25,6 +25,7 @@ class SharedRpcResources { std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register; std::unique_ptr<slobrok::api::MirrorAPI> _slobrok_mirror; std::unique_ptr<RpcTargetFactoryImpl> _target_factory; + vespalib::string _hostname; vespalib::string _handle; int _rpc_server_port; bool _shutdown; @@ -45,6 +46,9 @@ public: void shutdown(); [[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started + // Hostname of host node is running on. + [[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; } + const RpcTargetFactory& target_factory() const; private: void wait_until_slobrok_is_ready(); diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index 5e974eb4cce..acc5472a9a9 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -187,11 +187,8 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma assert(cmd->getAddress() != nullptr); auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress()); if (!target) { - // TODO need to bounce reply with TODO descriptive error code, mirroring that of RPCNetwork! auto reply = cmd->makeReply(); - reply->setResult(api::ReturnCode( - static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE), - "Couldn't find the darn thing in Slobrok")); // TODO :D + reply->setResult(make_no_address_for_service_error(*cmd->getAddress())); // Always dispatch async for synchronously generated replies, or we risk nuking the // stack if the reply receiver keeps resending synchronously as well. _message_dispatcher.dispatch_async(std::move(reply)); @@ -210,43 +207,17 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma const auto timeout = cmd->getTimeout(); // TODO verify it's fine that we alloc this on the request stash and use it this way - auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd), timeout); + auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd)); req->SetContext(FNET_Context(&req_ctx)); target->_target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this); } -namespace { - -api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, vespalib::duration timeout) { - // TODO determine all codes that must be (re)mapped. Current remapping adapted from RPCSend - // TODO need to keep enough state to give peer ID/spec in error messages - switch (req.GetErrorCode()) { - case FRTE_RPC_TIMEOUT: - return api::ReturnCode( - static_cast<api::ReturnCode::Result>(mbus::ErrorCode::TIMEOUT), - vespalib::make_string("A timeout occurred while waiting for '%s' (%g seconds expired); %s", - "TODO", vespalib::to_s(timeout), req.GetErrorMessage())); - case FRTE_RPC_CONNECTION: - return api::ReturnCode( - static_cast<api::ReturnCode::Result>(mbus::ErrorCode::CONNECTION_ERROR), - vespalib::make_string("A connection error occurred for '%s'; %s", - "TODO", req.GetErrorMessage())); - default: - return api::ReturnCode( - static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NETWORK_ERROR), - vespalib::make_string("A network error occurred for '%s'; %s", - "TODO", req.GetErrorMessage())); - } -} - -} - void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req); auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP); if (!req->CheckReturnTypes("bixbix")) { - api::ReturnCode error = map_frt_error_to_storage_api_error(*req, req_ctx->_timeout); + api::ReturnCode error = map_frt_error_to_storage_api_error(*req, *req_ctx); LOG(debug, "Client: received rpc.v1 error response: %s", error.toString().c_str()); auto error_reply = req_ctx->_originator_cmd->makeReply(); error_reply->setResult(std::move(error)); @@ -280,10 +251,49 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { _message_dispatcher.dispatch_sync(std::move(reply)); } +api::ReturnCode +StorageApiRpcService::map_frt_error_to_storage_api_error(FRT_RPCRequest& req, + const RpcRequestContext& req_ctx) { + // TODO determine all codes that must be (re)mapped. Current remapping is adapted from RPCSend + const auto& cmd = *req_ctx._originator_cmd; + auto target_service = CachingRpcTargetResolver::address_to_slobrok_id(*cmd.getAddress()); + switch (req.GetErrorCode()) { + case FRTE_RPC_TIMEOUT: + return api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::TIMEOUT), + vespalib::make_string("A timeout occurred while waiting for '%s' (%g seconds expired); %s", + target_service.c_str(), vespalib::to_s(cmd.getTimeout()), req.GetErrorMessage())); + case FRTE_RPC_CONNECTION: + return api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::CONNECTION_ERROR), + vespalib::make_string("A connection error occurred for '%s'; %s", + target_service.c_str(), req.GetErrorMessage())); + default: + return api::ReturnCode( + static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NETWORK_ERROR), + vespalib::make_string("A network error occurred for '%s'; %s", + target_service.c_str(), req.GetErrorMessage())); + } +} + +api::ReturnCode +StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessageAddress& addr) const { + auto error_code = static_cast<api::ReturnCode::Result>(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE); + auto error_msg = vespalib::make_string( + "The address of service '%s' could not be resolved. It is not currently " + "registered with the Vespa name server. " + "The service must be having problems, or the routing configuration is wrong. " + "Address resolution attempted from host '%s'", + CachingRpcTargetResolver::address_to_slobrok_id(addr).c_str(), + _rpc_resources.hostname().c_str()); + return api::ReturnCode(error_code, std::move(error_msg)); +} + /* * Major TODOs: * - tracing and trace propagation * - forwards/backwards compatibility + * - what to remap bounced Not Found errors to internally? * - lifetime semantics of FRT targets vs requests created from them? * - lifetime of document type/fieldset repos vs messages * - is repo ref squirreled away into the messages anywhere? diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 7ffa111fa60..4385228bb5c 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -4,6 +4,7 @@ #include "rpc_target.h" #include <vespa/fnet/frt/invokable.h> #include <vespa/fnet/frt/invoker.h> +#include <vespa/storageapi/messageapi/returncode.h> #include <vespa/vespalib/stllike/string.h> #include <atomic> #include <memory> @@ -51,11 +52,9 @@ private: struct RpcRequestContext { std::shared_ptr<api::StorageCommand> _originator_cmd; - std::chrono::nanoseconds _timeout; - RpcRequestContext(std::shared_ptr<api::StorageCommand> cmd, std::chrono::nanoseconds timeout) - : _originator_cmd(std::move(cmd)), - _timeout(timeout) + explicit RpcRequestContext(std::shared_ptr<api::StorageCommand> cmd) + : _originator_cmd(std::move(cmd)) {} }; @@ -65,6 +64,9 @@ private: template <typename MessageType> void encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params); void RequestDone(FRT_RPCRequest* request) override; + + api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx); + api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const; }; } // rpc |