summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-06-30 14:20:23 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-06-30 14:20:23 +0000
commitdce075d17a732068c33c9efbe77fd443e9c34105 (patch)
treeeaae5b804813a8fc0b163eb9fc27df41694bc605 /storage
parent3e66f4bb7f70c3135b720b7d5e39db8c0b24fd8b (diff)
Remove legacy MessageBus StorageAPI transport fallback
Direct P2P RPC has been the preferred way for years, and we don't need the fallback now that we're on Vespa 8.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp36
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp111
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp23
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h4
5 files changed, 26 insertions, 150 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index cd80244af87..8b8801f9cf5 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -196,10 +196,6 @@ public:
return _messages.pop_first_message();
}
- bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept {
- return _service->target_supports_direct_rpc(addr);
- }
-
void send_raw_request_and_expect_error(StorageApiNode& node,
FRT_RPCRequest* req,
const vespalib::string& expected_msg) {
@@ -220,16 +216,6 @@ StorageApiNode::~StorageApiNode() {
_shared_rpc_resources->shutdown();
}
-struct NodeWithoutStorageApiService : RpcNode {
- NodeWithoutStorageApiService(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
- : RpcNode(node_index, is_distributor, slobrok)
- {
- _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
- // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
- wait_until_visible_in_slobrok(_slobrok_id);
- }
-};
-
} // anonymous namespace
// TODO consider completely mocking Slobrok to avoid any race conditions during node registration
@@ -357,28 +343,6 @@ TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_s
EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things")));
}
-TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_supported) {
- NodeWithoutStorageApiService dummy_node(10, false, _slobrok);
- _node_0->wait_until_visible_in_slobrok(to_slobrok_id(dummy_node.node_address()));
-
- // Initially we assume targets are on a new enough version to understand storage API RPCs.
- EXPECT_TRUE(_node_0->target_supports_direct_rpc(dummy_node.node_address()));
- EXPECT_TRUE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
-
- // Send to an endpoint exposing RPC but not the Storage API server method.
- // It will bounce back immediately with an FRT "no such method" error.
- auto cmd = _node_0->create_dummy_put_command();
- cmd->setAddress(dummy_node.node_address());
- _node_0->send_request(cmd);
- auto bounced_msg = _node_0->wait_and_receive_single_message();
- ASSERT_TRUE(bounced_msg);
-
- // For now (and for the sake of simplicity), fall back to assuming no targets
- // support direct storage API RPC.
- EXPECT_FALSE(_node_0->target_supports_direct_rpc(dummy_node.node_address()));
- EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
-}
-
TEST_F(StorageApiRpcServiceTest, malformed_request_header_returns_rpc_error) {
auto& supervisor = _node_0->shared_rpc_resources().supervisor();
auto* req = supervisor.AllocRPCRequest();
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 5a3ebebcd9e..4c375a30867 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -35,10 +35,6 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<documentapi::Do
: _docAPIMsg(std::move(msg))
{ }
-StorageTransportContext::StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg)
- : _storageProtocolMsg(std::move(msg))
-{ }
-
StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request)
: _request(std::move(request))
{ }
@@ -105,18 +101,6 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr)));
process(std::move(cmd));
- } else if (protocolName == mbusprot::StorageProtocol::NAME) {
- std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release()));
-
- assert(storMsgPtr);
-
- //TODO: Can it be moved ?
- std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand();
- cmd->setTimeout(storMsgPtr->getTimeRemaining());
- cmd->setTrace(storMsgPtr->steal_trace());
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
-
- process(std::move(cmd));
} else {
LOGBM(warning, "Received unsupported message type %d for protocol '%s'",
msg->getType(), msg->getProtocol().c_str());
@@ -145,26 +129,6 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
const vespalib::string& protocolName = message->getProtocol();
if (protocolName == documentapi::DocumentProtocol::NAME) {
convertedReply = static_cast<documentapi::DocumentMessage &>(*message).createReply();
- } else if (protocolName == mbusprot::StorageProtocol::NAME) {
- std::shared_ptr<api::StorageReply> repl(static_cast<mbusprot::StorageCommand &>(*message).getCommand()->makeReply());
- auto sreply = std::make_unique<mbusprot::StorageReply>(repl);
-
- if (reply->hasErrors()) {
- // Convert only the first error since storageapi only
- // supports one return code.
- uint32_t mbuscode = reply->getError(0).getCode();
- api::ReturnCode::Result code((api::ReturnCode::Result) mbuscode);
- // Encode mbuscode into message not to lose it
- sreply->getReply()->setResult(storage::api::ReturnCode(
- code,
- mbus::ErrorCode::getName(mbuscode)
- + vespalib::string(": ")
- + reply->getError(0).getMessage()
- + vespalib::string(" (from ")
- + reply->getError(0).getService()
- + vespalib::string(")")));
- }
- convertedReply = std::move(sreply);
} else {
LOG(warning, "Received reply of unhandled protocol '%s'", protocolName.c_str());
return;
@@ -414,9 +378,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
// config here.
auto documentTypeRepo = _component.getTypeRepo()->documentTypeRepo;
_mbus = std::make_unique<mbus::RPCMessageBus>(
- mbus::ProtocolSet()
- .add(std::make_shared<documentapi::DocumentProtocol>(documentTypeRepo))
- .add(std::make_shared<mbusprot::StorageProtocol>(documentTypeRepo)),
+ mbus::ProtocolSet().add(std::make_shared<documentapi::DocumentProtocol>(documentTypeRepo)),
params,
_configUri);
@@ -554,23 +516,13 @@ CommunicationManager::sendCommand(
framework::MilliSecTimer startTime(_component.getClock());
switch (address.getProtocol()) {
- case api::StorageMessageAddress::Protocol::STORAGE:
+ case api::StorageMessageAddress::Protocol::STORAGE:
{
LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str());
- if (_storage_api_rpc_service->target_supports_direct_rpc(address)) {
- _storage_api_rpc_service->send_rpc_v1_request(msg);
- } else {
- auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
-
- cmd->setContext(mbus::Context(msg->getMsgId()));
- cmd->setRetryEnabled(false);
- cmd->setTimeRemaining(msg->getTimeout());
- cmd->setTrace(msg->steal_trace());
- sendMessageBusMessage(msg, std::move(cmd), address.to_mbus_route());
- }
+ _storage_api_rpc_service->send_rpc_v1_request(msg);
break;
}
- case api::StorageMessageAddress::Protocol::DOCUMENT:
+ case api::StorageMessageAddress::Protocol::DOCUMENT:
{
MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converting storageapi message to documentapi");
@@ -666,47 +618,32 @@ CommunicationManager::sendMessageBusReply(
mbus::Reply::UP replyUP;
LOG(spam, "Sending message bus reply %s", reply->toString().c_str());
+ assert(context._docAPIMsg); // StorageProtocol no longer uses MessageBus carrier.
- // If this was originally documentapi, create a reply now and transfer the
- // state.
- if (context._docAPIMsg) {
- if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) {
- replyUP = std::make_unique<documentapi::WrongDistributionReply>(reply->getResult().getMessage());
- replyUP->swapState(*context._docAPIMsg);
- replyUP->setTrace(reply->steal_trace());
- replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
- reply->getResult().getMessage()));
- } else {
- replyUP = context._docAPIMsg->createReply();
- replyUP->swapState(*context._docAPIMsg);
- replyUP->setTrace(reply->steal_trace());
- replyUP->setMessage(std::move(context._docAPIMsg));
- _docApiConverter.transferReplyState(*reply, *replyUP);
- }
- } else if (context._storageProtocolMsg) {
- replyUP = std::make_unique<mbusprot::StorageReply>(reply);
- if (reply->getResult().getResult() != api::ReturnCode::OK) {
- replyUP->addError(mbus::Error(reply->getResult().getResult(), reply->getResult().getMessage()));
- }
-
- replyUP->swapState(*context._storageProtocolMsg);
+ // Create an MBus reply and transfer state to it.
+ if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) {
+ replyUP = std::make_unique<documentapi::WrongDistributionReply>(reply->getResult().getMessage());
+ replyUP->swapState(*context._docAPIMsg);
replyUP->setTrace(reply->steal_trace());
- replyUP->setMessage(std::move(context._storageProtocolMsg));
+ replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
+ reply->getResult().getMessage()));
+ } else {
+ replyUP = context._docAPIMsg->createReply();
+ replyUP->swapState(*context._docAPIMsg);
+ replyUP->setTrace(reply->steal_trace());
+ replyUP->setMessage(std::move(context._docAPIMsg));
+ _docApiConverter.transferReplyState(*reply, *replyUP);
}
- if (replyUP) {
- // Forward message only if it was successfully stored in storage.
- if (!replyUP->hasErrors()) {
- mbus::Message::UP messageUP = replyUP->getMessage();
+ if (!replyUP->hasErrors()) {
+ mbus::Message::UP messageUP = replyUP->getMessage();
- if (messageUP && messageUP->getRoute().hasHops()) {
- messageUP->setContext(mbus::Context(FORWARDED_MESSAGE));
- _sourceSession->send(std::move(messageUP));
- }
+ if (messageUP && messageUP->getRoute().hasHops()) {
+ messageUP->setContext(mbus::Context(FORWARDED_MESSAGE));
+ _sourceSession->send(std::move(messageUP));
}
-
- _messageBusSession->reply(std::move(replyUP));
}
+ _messageBusSession->reply(std::move(replyUP));
}
bool
@@ -779,8 +716,6 @@ void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const
auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo);
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
_earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)));
- auto newStorageProtocol = std::make_shared<mbusprot::StorageProtocol>(repo);
- _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newStorageProtocol)));
}
if (_message_codec_provider) {
_message_codec_provider->update_atomically(repo);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index d52fb56cf20..6f953411cac 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -55,12 +55,10 @@ class RPCRequestWrapper;
class StorageTransportContext : public api::TransportContext {
public:
explicit StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
- explicit StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg);
explicit StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request);
~StorageTransportContext() override;
std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg;
- std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg;
std::unique_ptr<RPCRequestWrapper> _request;
};
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 78a9956c334..006193f0c37 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -37,8 +37,7 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
_message_codec_provider(message_codec_provider),
_params(params),
_target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory(),
- params.num_rpc_targets_per_node)),
- _direct_rpc_supported(true)
+ params.num_rpc_targets_per_node))
{
register_server_methods(rpc_resources);
}
@@ -312,8 +311,8 @@ void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req,
auto& cmd = *req_ctx._originator_cmd;
api::ReturnCode error;
if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) {
- mark_peer_without_direct_rpc_support(*cmd.getAddress());
- error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported");
+ error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Legacy MessageBus StorageAPI transport is no longer supported. "
+ "Old nodes must be upgraded to a newer Vespa version.");
} else {
error = map_frt_error_to_storage_api_error(req, req_ctx);
}
@@ -378,22 +377,6 @@ StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessag
return api::ReturnCode(error_code, std::move(error_msg));
}
-void StorageApiRpcService::mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr) {
- bool expected = true;
- if (_direct_rpc_supported.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
- LOG(info, "Node %s does not support direct Storage API RPC; falling back "
- "to legacy MessageBus protocol. Not logging this for any further nodes",
- addr.toString().c_str());
- }
-}
-
-bool StorageApiRpcService::target_supports_direct_rpc(
- [[maybe_unused]] const api::StorageMessageAddress& addr) const noexcept {
- // Stale reads isn't an issue here, since the worst case is just receiving
- // a few more "no such method" errors.
- return _direct_rpc_supported.load(std::memory_order_relaxed);
-}
-
bool
StorageApiRpcService::address_visible_in_slobrok_uncached(
const api::StorageMessageAddress& addr) const noexcept
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 2526cf5434c..3166abba956 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -47,7 +47,6 @@ private:
MessageCodecProvider& _message_codec_provider;
const Params _params;
std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
- std::atomic<bool> _direct_rpc_supported;
public:
StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
@@ -55,7 +54,6 @@ public:
const Params& params);
~StorageApiRpcService() override;
- [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
// Bypasses resolver cache and returns whether local Slobrok mirror has at least 1 spec for the given address.
[[nodiscard]] bool address_visible_in_slobrok_uncached(const api::StorageMessageAddress& addr) const noexcept;
@@ -91,8 +89,6 @@ private:
api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const;
-
- void mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr);
};
} // rpc