diff options
Diffstat (limited to 'storage/src')
5 files changed, 26 insertions, 150 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index cd80244af87..8b8801f9cf5 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -196,10 +196,6 @@ public: return _messages.pop_first_message(); } - bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept { - return _service->target_supports_direct_rpc(addr); - } - void send_raw_request_and_expect_error(StorageApiNode& node, FRT_RPCRequest* req, const vespalib::string& expected_msg) { @@ -220,16 +216,6 @@ StorageApiNode::~StorageApiNode() { _shared_rpc_resources->shutdown(); } -struct NodeWithoutStorageApiService : RpcNode { - NodeWithoutStorageApiService(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok) - : RpcNode(node_index, is_distributor, slobrok) - { - _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id); - // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough. - wait_until_visible_in_slobrok(_slobrok_id); - } -}; - } // anonymous namespace // TODO consider completely mocking Slobrok to avoid any race conditions during node registration @@ -357,28 +343,6 @@ TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_s EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things"))); } -TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_supported) { - NodeWithoutStorageApiService dummy_node(10, false, _slobrok); - _node_0->wait_until_visible_in_slobrok(to_slobrok_id(dummy_node.node_address())); - - // Initially we assume targets are on a new enough version to understand storage API RPCs. - EXPECT_TRUE(_node_0->target_supports_direct_rpc(dummy_node.node_address())); - EXPECT_TRUE(_node_0->target_supports_direct_rpc(_node_1->node_address())); - - // Send to an endpoint exposing RPC but not the Storage API server method. - // It will bounce back immediately with an FRT "no such method" error. - auto cmd = _node_0->create_dummy_put_command(); - cmd->setAddress(dummy_node.node_address()); - _node_0->send_request(cmd); - auto bounced_msg = _node_0->wait_and_receive_single_message(); - ASSERT_TRUE(bounced_msg); - - // For now (and for the sake of simplicity), fall back to assuming no targets - // support direct storage API RPC. - EXPECT_FALSE(_node_0->target_supports_direct_rpc(dummy_node.node_address())); - EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address())); -} - TEST_F(StorageApiRpcServiceTest, malformed_request_header_returns_rpc_error) { auto& supervisor = _node_0->shared_rpc_resources().supervisor(); auto* req = supervisor.AllocRPCRequest(); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 5a3ebebcd9e..4c375a30867 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -35,10 +35,6 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<documentapi::Do : _docAPIMsg(std::move(msg)) { } -StorageTransportContext::StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg) - : _storageProtocolMsg(std::move(msg)) -{ } - StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request) : _request(std::move(request)) { } @@ -105,18 +101,6 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr))); process(std::move(cmd)); - } else if (protocolName == mbusprot::StorageProtocol::NAME) { - std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release())); - - assert(storMsgPtr); - - //TODO: Can it be moved ? - std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand(); - cmd->setTimeout(storMsgPtr->getTimeRemaining()); - cmd->setTrace(storMsgPtr->steal_trace()); - cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr))); - - process(std::move(cmd)); } else { LOGBM(warning, "Received unsupported message type %d for protocol '%s'", msg->getType(), msg->getProtocol().c_str()); @@ -145,26 +129,6 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) const vespalib::string& protocolName = message->getProtocol(); if (protocolName == documentapi::DocumentProtocol::NAME) { convertedReply = static_cast<documentapi::DocumentMessage &>(*message).createReply(); - } else if (protocolName == mbusprot::StorageProtocol::NAME) { - std::shared_ptr<api::StorageReply> repl(static_cast<mbusprot::StorageCommand &>(*message).getCommand()->makeReply()); - auto sreply = std::make_unique<mbusprot::StorageReply>(repl); - - if (reply->hasErrors()) { - // Convert only the first error since storageapi only - // supports one return code. - uint32_t mbuscode = reply->getError(0).getCode(); - api::ReturnCode::Result code((api::ReturnCode::Result) mbuscode); - // Encode mbuscode into message not to lose it - sreply->getReply()->setResult(storage::api::ReturnCode( - code, - mbus::ErrorCode::getName(mbuscode) - + vespalib::string(": ") - + reply->getError(0).getMessage() - + vespalib::string(" (from ") - + reply->getError(0).getService() - + vespalib::string(")"))); - } - convertedReply = std::move(sreply); } else { LOG(warning, "Received reply of unhandled protocol '%s'", protocolName.c_str()); return; @@ -414,9 +378,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> // config here. auto documentTypeRepo = _component.getTypeRepo()->documentTypeRepo; _mbus = std::make_unique<mbus::RPCMessageBus>( - mbus::ProtocolSet() - .add(std::make_shared<documentapi::DocumentProtocol>(documentTypeRepo)) - .add(std::make_shared<mbusprot::StorageProtocol>(documentTypeRepo)), + mbus::ProtocolSet().add(std::make_shared<documentapi::DocumentProtocol>(documentTypeRepo)), params, _configUri); @@ -554,23 +516,13 @@ CommunicationManager::sendCommand( framework::MilliSecTimer startTime(_component.getClock()); switch (address.getProtocol()) { - case api::StorageMessageAddress::Protocol::STORAGE: + case api::StorageMessageAddress::Protocol::STORAGE: { LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); - if (_storage_api_rpc_service->target_supports_direct_rpc(address)) { - _storage_api_rpc_service->send_rpc_v1_request(msg); - } else { - auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); - - cmd->setContext(mbus::Context(msg->getMsgId())); - cmd->setRetryEnabled(false); - cmd->setTimeRemaining(msg->getTimeout()); - cmd->setTrace(msg->steal_trace()); - sendMessageBusMessage(msg, std::move(cmd), address.to_mbus_route()); - } + _storage_api_rpc_service->send_rpc_v1_request(msg); break; } - case api::StorageMessageAddress::Protocol::DOCUMENT: + case api::StorageMessageAddress::Protocol::DOCUMENT: { MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converting storageapi message to documentapi"); @@ -666,47 +618,32 @@ CommunicationManager::sendMessageBusReply( mbus::Reply::UP replyUP; LOG(spam, "Sending message bus reply %s", reply->toString().c_str()); + assert(context._docAPIMsg); // StorageProtocol no longer uses MessageBus carrier. - // If this was originally documentapi, create a reply now and transfer the - // state. - if (context._docAPIMsg) { - if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) { - replyUP = std::make_unique<documentapi::WrongDistributionReply>(reply->getResult().getMessage()); - replyUP->swapState(*context._docAPIMsg); - replyUP->setTrace(reply->steal_trace()); - replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - reply->getResult().getMessage())); - } else { - replyUP = context._docAPIMsg->createReply(); - replyUP->swapState(*context._docAPIMsg); - replyUP->setTrace(reply->steal_trace()); - replyUP->setMessage(std::move(context._docAPIMsg)); - _docApiConverter.transferReplyState(*reply, *replyUP); - } - } else if (context._storageProtocolMsg) { - replyUP = std::make_unique<mbusprot::StorageReply>(reply); - if (reply->getResult().getResult() != api::ReturnCode::OK) { - replyUP->addError(mbus::Error(reply->getResult().getResult(), reply->getResult().getMessage())); - } - - replyUP->swapState(*context._storageProtocolMsg); + // Create an MBus reply and transfer state to it. + if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) { + replyUP = std::make_unique<documentapi::WrongDistributionReply>(reply->getResult().getMessage()); + replyUP->swapState(*context._docAPIMsg); replyUP->setTrace(reply->steal_trace()); - replyUP->setMessage(std::move(context._storageProtocolMsg)); + replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + reply->getResult().getMessage())); + } else { + replyUP = context._docAPIMsg->createReply(); + replyUP->swapState(*context._docAPIMsg); + replyUP->setTrace(reply->steal_trace()); + replyUP->setMessage(std::move(context._docAPIMsg)); + _docApiConverter.transferReplyState(*reply, *replyUP); } - if (replyUP) { - // Forward message only if it was successfully stored in storage. - if (!replyUP->hasErrors()) { - mbus::Message::UP messageUP = replyUP->getMessage(); + if (!replyUP->hasErrors()) { + mbus::Message::UP messageUP = replyUP->getMessage(); - if (messageUP && messageUP->getRoute().hasHops()) { - messageUP->setContext(mbus::Context(FORWARDED_MESSAGE)); - _sourceSession->send(std::move(messageUP)); - } + if (messageUP && messageUP->getRoute().hasHops()) { + messageUP->setContext(mbus::Context(FORWARDED_MESSAGE)); + _sourceSession->send(std::move(messageUP)); } - - _messageBusSession->reply(std::move(replyUP)); } + _messageBusSession->reply(std::move(replyUP)); } bool @@ -779,8 +716,6 @@ void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo); std::lock_guard<std::mutex> guard(_earlierGenerationsLock); _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol))); - auto newStorageProtocol = std::make_shared<mbusprot::StorageProtocol>(repo); - _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newStorageProtocol))); } if (_message_codec_provider) { _message_codec_provider->update_atomically(repo); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index d52fb56cf20..6f953411cac 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -55,12 +55,10 @@ class RPCRequestWrapper; class StorageTransportContext : public api::TransportContext { public: explicit StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg); - explicit StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg); explicit StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request); ~StorageTransportContext() override; std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg; - std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg; std::unique_ptr<RPCRequestWrapper> _request; }; diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index 78a9956c334..006193f0c37 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -37,8 +37,7 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher _message_codec_provider(message_codec_provider), _params(params), _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory(), - params.num_rpc_targets_per_node)), - _direct_rpc_supported(true) + params.num_rpc_targets_per_node)) { register_server_methods(rpc_resources); } @@ -312,8 +311,8 @@ void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req, auto& cmd = *req_ctx._originator_cmd; api::ReturnCode error; if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) { - mark_peer_without_direct_rpc_support(*cmd.getAddress()); - error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported"); + error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Legacy MessageBus StorageAPI transport is no longer supported. " + "Old nodes must be upgraded to a newer Vespa version."); } else { error = map_frt_error_to_storage_api_error(req, req_ctx); } @@ -378,22 +377,6 @@ StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessag return api::ReturnCode(error_code, std::move(error_msg)); } -void StorageApiRpcService::mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr) { - bool expected = true; - if (_direct_rpc_supported.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { - LOG(info, "Node %s does not support direct Storage API RPC; falling back " - "to legacy MessageBus protocol. Not logging this for any further nodes", - addr.toString().c_str()); - } -} - -bool StorageApiRpcService::target_supports_direct_rpc( - [[maybe_unused]] const api::StorageMessageAddress& addr) const noexcept { - // Stale reads isn't an issue here, since the worst case is just receiving - // a few more "no such method" errors. - return _direct_rpc_supported.load(std::memory_order_relaxed); -} - bool StorageApiRpcService::address_visible_in_slobrok_uncached( const api::StorageMessageAddress& addr) const noexcept diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 2526cf5434c..3166abba956 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -47,7 +47,6 @@ private: MessageCodecProvider& _message_codec_provider; const Params _params; std::unique_ptr<CachingRpcTargetResolver> _target_resolver; - std::atomic<bool> _direct_rpc_supported; public: StorageApiRpcService(MessageDispatcher& message_dispatcher, SharedRpcResources& rpc_resources, @@ -55,7 +54,6 @@ public: const Params& params); ~StorageApiRpcService() override; - [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept; // Bypasses resolver cache and returns whether local Slobrok mirror has at least 1 spec for the given address. [[nodiscard]] bool address_visible_in_slobrok_uncached(const api::StorageMessageAddress& addr) const noexcept; @@ -91,8 +89,6 @@ private: api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx); api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const; - - void mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr); }; } // rpc |