summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp36
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp111
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp23
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h4
5 files changed, 26 insertions, 150 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index cd80244af87..8b8801f9cf5 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -196,10 +196,6 @@ public:
return _messages.pop_first_message();
}
- bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept {
- return _service->target_supports_direct_rpc(addr);
- }
-
void send_raw_request_and_expect_error(StorageApiNode& node,
FRT_RPCRequest* req,
const vespalib::string& expected_msg) {
@@ -220,16 +216,6 @@ StorageApiNode::~StorageApiNode() {
_shared_rpc_resources->shutdown();
}
-struct NodeWithoutStorageApiService : RpcNode {
- NodeWithoutStorageApiService(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
- : RpcNode(node_index, is_distributor, slobrok)
- {
- _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
- // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
- wait_until_visible_in_slobrok(_slobrok_id);
- }
-};
-
} // anonymous namespace
// TODO consider completely mocking Slobrok to avoid any race conditions during node registration
@@ -357,28 +343,6 @@ TEST_F(StorageApiRpcServiceTest, response_trace_only_propagated_if_trace_level_s
EXPECT_THAT(trace_str, Not(HasSubstr("Doing cool things")));
}
-TEST_F(StorageApiRpcServiceTest, rpc_method_not_found_toggles_rpc_as_not_supported) {
- NodeWithoutStorageApiService dummy_node(10, false, _slobrok);
- _node_0->wait_until_visible_in_slobrok(to_slobrok_id(dummy_node.node_address()));
-
- // Initially we assume targets are on a new enough version to understand storage API RPCs.
- EXPECT_TRUE(_node_0->target_supports_direct_rpc(dummy_node.node_address()));
- EXPECT_TRUE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
-
- // Send to an endpoint exposing RPC but not the Storage API server method.
- // It will bounce back immediately with an FRT "no such method" error.
- auto cmd = _node_0->create_dummy_put_command();
- cmd->setAddress(dummy_node.node_address());
- _node_0->send_request(cmd);
- auto bounced_msg = _node_0->wait_and_receive_single_message();
- ASSERT_TRUE(bounced_msg);
-
- // For now (and for the sake of simplicity), fall back to assuming no targets
- // support direct storage API RPC.
- EXPECT_FALSE(_node_0->target_supports_direct_rpc(dummy_node.node_address()));
- EXPECT_FALSE(_node_0->target_supports_direct_rpc(_node_1->node_address()));
-}
-
TEST_F(StorageApiRpcServiceTest, malformed_request_header_returns_rpc_error) {
auto& supervisor = _node_0->shared_rpc_resources().supervisor();
auto* req = supervisor.AllocRPCRequest();
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 5a3ebebcd9e..4c375a30867 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -35,10 +35,6 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<documentapi::Do
: _docAPIMsg(std::move(msg))
{ }
-StorageTransportContext::StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg)
- : _storageProtocolMsg(std::move(msg))
-{ }
-
StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request)
: _request(std::move(request))
{ }
@@ -105,18 +101,6 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr)));
process(std::move(cmd));
- } else if (protocolName == mbusprot::StorageProtocol::NAME) {
- std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release()));
-
- assert(storMsgPtr);
-
- //TODO: Can it be moved ?
- std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand();
- cmd->setTimeout(storMsgPtr->getTimeRemaining());
- cmd->setTrace(storMsgPtr->steal_trace());
- cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
-
- process(std::move(cmd));
} else {
LOGBM(warning, "Received unsupported message type %d for protocol '%s'",
msg->getType(), msg->getProtocol().c_str());
@@ -145,26 +129,6 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
const vespalib::string& protocolName = message->getProtocol();
if (protocolName == documentapi::DocumentProtocol::NAME) {
convertedReply = static_cast<documentapi::DocumentMessage &>(*message).createReply();
- } else if (protocolName == mbusprot::StorageProtocol::NAME) {
- std::shared_ptr<api::StorageReply> repl(static_cast<mbusprot::StorageCommand &>(*message).getCommand()->makeReply());
- auto sreply = std::make_unique<mbusprot::StorageReply>(repl);
-
- if (reply->hasErrors()) {
- // Convert only the first error since storageapi only
- // supports one return code.
- uint32_t mbuscode = reply->getError(0).getCode();
- api::ReturnCode::Result code((api::ReturnCode::Result) mbuscode);
- // Encode mbuscode into message not to lose it
- sreply->getReply()->setResult(storage::api::ReturnCode(
- code,
- mbus::ErrorCode::getName(mbuscode)
- + vespalib::string(": ")
- + reply->getError(0).getMessage()
- + vespalib::string(" (from ")
- + reply->getError(0).getService()
- + vespalib::string(")")));
- }
- convertedReply = std::move(sreply);
} else {
LOG(warning, "Received reply of unhandled protocol '%s'", protocolName.c_str());
return;
@@ -414,9 +378,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
// config here.
auto documentTypeRepo = _component.getTypeRepo()->documentTypeRepo;
_mbus = std::make_unique<mbus::RPCMessageBus>(
- mbus::ProtocolSet()
- .add(std::make_shared<documentapi::DocumentProtocol>(documentTypeRepo))
- .add(std::make_shared<mbusprot::StorageProtocol>(documentTypeRepo)),
+ mbus::ProtocolSet().add(std::make_shared<documentapi::DocumentProtocol>(documentTypeRepo)),
params,
_configUri);
@@ -554,23 +516,13 @@ CommunicationManager::sendCommand(
framework::MilliSecTimer startTime(_component.getClock());
switch (address.getProtocol()) {
- case api::StorageMessageAddress::Protocol::STORAGE:
+ case api::StorageMessageAddress::Protocol::STORAGE:
{
LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str());
- if (_storage_api_rpc_service->target_supports_direct_rpc(address)) {
- _storage_api_rpc_service->send_rpc_v1_request(msg);
- } else {
- auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
-
- cmd->setContext(mbus::Context(msg->getMsgId()));
- cmd->setRetryEnabled(false);
- cmd->setTimeRemaining(msg->getTimeout());
- cmd->setTrace(msg->steal_trace());
- sendMessageBusMessage(msg, std::move(cmd), address.to_mbus_route());
- }
+ _storage_api_rpc_service->send_rpc_v1_request(msg);
break;
}
- case api::StorageMessageAddress::Protocol::DOCUMENT:
+ case api::StorageMessageAddress::Protocol::DOCUMENT:
{
MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converting storageapi message to documentapi");
@@ -666,47 +618,32 @@ CommunicationManager::sendMessageBusReply(
mbus::Reply::UP replyUP;
LOG(spam, "Sending message bus reply %s", reply->toString().c_str());
+ assert(context._docAPIMsg); // StorageProtocol no longer uses MessageBus carrier.
- // If this was originally documentapi, create a reply now and transfer the
- // state.
- if (context._docAPIMsg) {
- if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) {
- replyUP = std::make_unique<documentapi::WrongDistributionReply>(reply->getResult().getMessage());
- replyUP->swapState(*context._docAPIMsg);
- replyUP->setTrace(reply->steal_trace());
- replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
- reply->getResult().getMessage()));
- } else {
- replyUP = context._docAPIMsg->createReply();
- replyUP->swapState(*context._docAPIMsg);
- replyUP->setTrace(reply->steal_trace());
- replyUP->setMessage(std::move(context._docAPIMsg));
- _docApiConverter.transferReplyState(*reply, *replyUP);
- }
- } else if (context._storageProtocolMsg) {
- replyUP = std::make_unique<mbusprot::StorageReply>(reply);
- if (reply->getResult().getResult() != api::ReturnCode::OK) {
- replyUP->addError(mbus::Error(reply->getResult().getResult(), reply->getResult().getMessage()));
- }
-
- replyUP->swapState(*context._storageProtocolMsg);
+ // Create an MBus reply and transfer state to it.
+ if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) {
+ replyUP = std::make_unique<documentapi::WrongDistributionReply>(reply->getResult().getMessage());
+ replyUP->swapState(*context._docAPIMsg);
replyUP->setTrace(reply->steal_trace());
- replyUP->setMessage(std::move(context._storageProtocolMsg));
+ replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
+ reply->getResult().getMessage()));
+ } else {
+ replyUP = context._docAPIMsg->createReply();
+ replyUP->swapState(*context._docAPIMsg);
+ replyUP->setTrace(reply->steal_trace());
+ replyUP->setMessage(std::move(context._docAPIMsg));
+ _docApiConverter.transferReplyState(*reply, *replyUP);
}
- if (replyUP) {
- // Forward message only if it was successfully stored in storage.
- if (!replyUP->hasErrors()) {
- mbus::Message::UP messageUP = replyUP->getMessage();
+ if (!replyUP->hasErrors()) {
+ mbus::Message::UP messageUP = replyUP->getMessage();
- if (messageUP && messageUP->getRoute().hasHops()) {
- messageUP->setContext(mbus::Context(FORWARDED_MESSAGE));
- _sourceSession->send(std::move(messageUP));
- }
+ if (messageUP && messageUP->getRoute().hasHops()) {
+ messageUP->setContext(mbus::Context(FORWARDED_MESSAGE));
+ _sourceSession->send(std::move(messageUP));
}
-
- _messageBusSession->reply(std::move(replyUP));
}
+ _messageBusSession->reply(std::move(replyUP));
}
bool
@@ -779,8 +716,6 @@ void CommunicationManager::updateMessagebusProtocol(const std::shared_ptr<const
auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(repo);
std::lock_guard<std::mutex> guard(_earlierGenerationsLock);
_earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newDocumentProtocol)));
- auto newStorageProtocol = std::make_shared<mbusprot::StorageProtocol>(repo);
- _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newStorageProtocol)));
}
if (_message_codec_provider) {
_message_codec_provider->update_atomically(repo);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index d52fb56cf20..6f953411cac 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -55,12 +55,10 @@ class RPCRequestWrapper;
class StorageTransportContext : public api::TransportContext {
public:
explicit StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
- explicit StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg);
explicit StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request);
~StorageTransportContext() override;
std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg;
- std::unique_ptr<mbusprot::StorageCommand> _storageProtocolMsg;
std::unique_ptr<RPCRequestWrapper> _request;
};
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 78a9956c334..006193f0c37 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -37,8 +37,7 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
_message_codec_provider(message_codec_provider),
_params(params),
_target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory(),
- params.num_rpc_targets_per_node)),
- _direct_rpc_supported(true)
+ params.num_rpc_targets_per_node))
{
register_server_methods(rpc_resources);
}
@@ -312,8 +311,8 @@ void StorageApiRpcService::handle_request_done_rpc_error(FRT_RPCRequest& req,
auto& cmd = *req_ctx._originator_cmd;
api::ReturnCode error;
if (req.GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) {
- mark_peer_without_direct_rpc_support(*cmd.getAddress());
- error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Direct Storage RPC protocol not supported");
+ error = api::ReturnCode(api::ReturnCode::NOT_CONNECTED, "Legacy MessageBus StorageAPI transport is no longer supported. "
+ "Old nodes must be upgraded to a newer Vespa version.");
} else {
error = map_frt_error_to_storage_api_error(req, req_ctx);
}
@@ -378,22 +377,6 @@ StorageApiRpcService::make_no_address_for_service_error(const api::StorageMessag
return api::ReturnCode(error_code, std::move(error_msg));
}
-void StorageApiRpcService::mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr) {
- bool expected = true;
- if (_direct_rpc_supported.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
- LOG(info, "Node %s does not support direct Storage API RPC; falling back "
- "to legacy MessageBus protocol. Not logging this for any further nodes",
- addr.toString().c_str());
- }
-}
-
-bool StorageApiRpcService::target_supports_direct_rpc(
- [[maybe_unused]] const api::StorageMessageAddress& addr) const noexcept {
- // Stale reads isn't an issue here, since the worst case is just receiving
- // a few more "no such method" errors.
- return _direct_rpc_supported.load(std::memory_order_relaxed);
-}
-
bool
StorageApiRpcService::address_visible_in_slobrok_uncached(
const api::StorageMessageAddress& addr) const noexcept
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 2526cf5434c..3166abba956 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -47,7 +47,6 @@ private:
MessageCodecProvider& _message_codec_provider;
const Params _params;
std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
- std::atomic<bool> _direct_rpc_supported;
public:
StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
@@ -55,7 +54,6 @@ public:
const Params& params);
~StorageApiRpcService() override;
- [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
// Bypasses resolver cache and returns whether local Slobrok mirror has at least 1 spec for the given address.
[[nodiscard]] bool address_visible_in_slobrok_uncached(const api::StorageMessageAddress& addr) const noexcept;
@@ -91,8 +89,6 @@ private:
api::ReturnCode map_frt_error_to_storage_api_error(FRT_RPCRequest& req, const RpcRequestContext& req_ctx);
api::ReturnCode make_no_address_for_service_error(const api::StorageMessageAddress& addr) const;
-
- void mark_peer_without_direct_rpc_support(const api::StorageMessageAddress& addr);
};
} // rpc