diff options
7 files changed, 19 insertions, 227 deletions
diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp index 0ae957ea340..80bfc45184f 100644 --- a/messagebus/src/tests/sendadapter/sendadapter.cpp +++ b/messagebus/src/tests/sendadapter/sendadapter.cpp @@ -5,7 +5,6 @@ #include <vespa/messagebus/testlib/simplereply.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/testserver.h> -#include <vespa/messagebus/network/rpcsendv1.h> #include <vespa/messagebus/network/rpcsendv2.h> #include <vespa/vespalib/testkit/testapp.h> @@ -211,10 +210,8 @@ TEST("test that all known versions are present") { TestData data; ASSERT_TRUE(data.start()); EXPECT_FALSE(data._srcServer.net.getSendAdapter(vespalib::Version(4, 999)) != nullptr); - EXPECT_TRUE(data._srcServer.net.getSendAdapter(vespalib::Version(5, 0)) != nullptr); - EXPECT_TRUE(dynamic_cast<mbus::RPCSendV1 *>(data._srcServer.net.getSendAdapter(vespalib::Version(5, 0))) != nullptr); - EXPECT_TRUE(data._srcServer.net.getSendAdapter(vespalib::Version(6, 148)) != nullptr); - EXPECT_TRUE(dynamic_cast<mbus::RPCSendV1 *>(data._srcServer.net.getSendAdapter(vespalib::Version(6, 148))) != nullptr); + EXPECT_FALSE(data._srcServer.net.getSendAdapter(vespalib::Version(5, 0)) != nullptr); + EXPECT_FALSE(data._srcServer.net.getSendAdapter(vespalib::Version(6, 148)) != nullptr); EXPECT_TRUE(data._srcServer.net.getSendAdapter(vespalib::Version(6, 149)) != nullptr); EXPECT_TRUE(dynamic_cast<mbus::RPCSendV2 *>(data._srcServer.net.getSendAdapter(vespalib::Version(6, 149))) != nullptr); EXPECT_TRUE(data._srcServer.net.getSendAdapter(vespalib::Version(9, 999)) != nullptr); @@ -224,7 +221,7 @@ TEST("test that all known versions are present") { TEST("test that we can send between multiple versions") { TestData data; ASSERT_TRUE(data.start()); - TEST_DO(testSendAdapters(data, {vespalib::Version(5, 0), vespalib::Version(6, 148), vespalib::Version(6, 149), vespalib::Version(9, 999)})); + TEST_DO(testSendAdapters(data, {vespalib::Version(6, 149), vespalib::Version(9, 999)})); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt index fffef5518c9..8b058ef5fcc 100644 --- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt +++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt @@ -5,7 +5,6 @@ vespa_add_library(messagebus_network OBJECT rpcnetwork.cpp rpcnetworkparams.cpp rpcsend.cpp - rpcsendv1.cpp rpcsendv2.cpp rpcservice.cpp rpcserviceaddress.cpp diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 0f7ebeb9a36..77d8ca24cfc 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpcnetwork.h" #include "rpcservicepool.h" -#include "rpcsendv1.h" #include "rpcsendv2.h" #include "rpctargetpool.h" #include "rpcnetworkparams.h" @@ -137,7 +136,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4_Ki)), _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 64_Ki)), - _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), _compressionConfig(params.getCompressionConfig()), @@ -195,9 +193,7 @@ RPCNetwork::attach(INetworkOwner &owner) LOG_ASSERT(_owner == nullptr); _owner = &owner; - _sendV1->attach(*this); _sendV2->attach(*this); - _sendAdapters[vespalib::Version(5)] = _sendV1.get(); _sendAdapters[vespalib::Version(6, 149)] = _sendV2.get(); FRT_ReflectionBuilder builder(_orb.get()); diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 38f35d1266a..e706431f90d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -66,7 +66,6 @@ private: std::unique_ptr<FNET_Task> _targetPoolTask; std::unique_ptr<RPCServicePool> _servicePool; std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; - std::unique_ptr<RPCSendAdapter> _sendV1; std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; CompressionConfig _compressionConfig; diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp deleted file mode 100644 index 7342c264bfa..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "rpcsendv1.h" -#include "rpcnetwork.h" -#include "rpcserviceaddress.h" -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/error.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/frt/reflection.h> - -using vespalib::make_string; - -namespace mbus { - -namespace { - -const char *METHOD_NAME = "mbus.send1"; -const char *METHOD_PARAMS = "sssbilsxi"; -const char *METHOD_RETURN = "sdISSsxs"; - -} - -bool RPCSendV1::isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons) -{ - return (method == METHOD_NAME) && - (request == METHOD_PARAMS) && - (respons == METHOD_RETURN); -} - -const char * -RPCSendV1::getReturnSpec() const { - return METHOD_RETURN; -} - -void -RPCSendV1::build(FRT_ReflectionBuilder & builder) -{ - builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, FRT_METHOD(RPCSendV1::invoke), this); - builder.MethodDesc("Send a message bus request and get a reply back."); - builder.ParamDesc("version", "The version of the message."); - builder.ParamDesc("route", "Names of additional hops to visit."); - builder.ParamDesc("session", "The local session that should receive this message."); - builder.ParamDesc("retryEnabled", "Whether or not this message can be resent."); - builder.ParamDesc("retry", "The number of times the sending of this message has been retried."); - builder.ParamDesc("timeRemaining", "The number of milliseconds until timeout."); - builder.ParamDesc("protocol", "The name of the protocol that knows how to decode this message."); - builder.ParamDesc("payload", "The protocol specific message payload."); - builder.ParamDesc("level", "The trace level of the message."); - builder.ReturnDesc("version", "The lowest version the message was serialized as."); - builder.ReturnDesc("retry", "The retry request of the reply."); - builder.ReturnDesc("errorCodes", "The reply error codes."); - builder.ReturnDesc("errorMessages", "The reply error messages."); - builder.ReturnDesc("errorServices", "The reply error service names."); - builder.ReturnDesc("protocol", "The name of the protocol that knows how to decode this reply."); - builder.ReturnDesc("payload", "The protocol specific reply payload."); - builder.ReturnDesc("trace", "A string representation of the trace."); -} - -void -RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, duration timeRemaining) const -{ - - FRT_Values &args = *req.GetParams(); - req.SetMethodName(METHOD_NAME); - args.AddString(version.toString().c_str()); - args.AddString(route.toString().c_str()); - args.AddString(address.getSessionName().c_str()); - args.AddInt8(msg.getRetryEnabled() ? 1 : 0); - args.AddInt32(msg.getRetry()); - args.AddInt64(vespalib::count_ms(timeRemaining)); - args.AddString(msg.getProtocol().c_str()); - filler.fill(args); - args.AddInt32(traceLevel); -} - -namespace { - -class ParamsV1 : public RPCSend::Params -{ -public: - ParamsV1(const FRT_Values &args) : _args(args) { } - - uint32_t getTraceLevel() const override { return _args[8]._intval32; } - bool useRetry() const override { return _args[3]._intval8 != 0; } - uint32_t getRetries() const override { return _args[4]._intval32; } - duration getRemainingTime() const override { return std::chrono::milliseconds(_args[5]._intval64); } - - vespalib::Version getVersion() const override { - return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len)); - } - vespalib::stringref getRoute() const override { - return vespalib::stringref(_args[1]._string._str, _args[1]._string._len); - } - vespalib::stringref getSession() const override { - return vespalib::stringref(_args[2]._string._str, _args[2]._string._len); - } - vespalib::stringref getProtocol() const override { - return vespalib::stringref(_args[6]._string._str, _args[6]._string._len); - } - BlobRef getPayload() const override { - return BlobRef(_args[7]._data._buf, _args[7]._data._len); - } -private: - const FRT_Values & _args; -}; - -} - -std::unique_ptr<RPCSend::Params> -RPCSendV1::toParams(const FRT_Values &args) const -{ - return std::make_unique<ParamsV1>(args); -} - - -std::unique_ptr<Reply> -RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error & error, vespalib::Trace & trace) const -{ - vespalib::Version version = vespalib::Version(ret[0]._string._str); - double retryDelay = ret[1]._double; - uint32_t *errorCodes = ret[2]._int32_array._pt; - uint32_t errorCodesLen = ret[2]._int32_array._len; - FRT_StringValue *errorMessages = ret[3]._string_array._pt; - uint32_t errorMessagesLen = ret[3]._string_array._len; - FRT_StringValue *errorServices = ret[4]._string_array._pt; - uint32_t errorServicesLen = ret[4]._string_array._len; - const char *protocolName = ret[5]._string._str; - BlobRef payload(ret[6]._data._buf, ret[6]._data._len); - const char *traceStr = ret[7]._string._str; - - Reply::UP reply; - if (payload.size() > 0) { - reply = decode(protocolName, version, payload, error); - } - if ( ! reply ) { - reply = std::make_unique<EmptyReply>(); - } - reply->setRetryDelay(retryDelay); - for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) { - reply->addError(Error(errorCodes[i], errorMessages[i]._str, - errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str())); - } - trace.addChild(TraceNode::decode(traceStr)); - return reply; -} - -void -RPCSendV1::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const { - ret.AddString(version.c_str()); - ret.AddDouble(reply.getRetryDelay()); - - uint32_t errorCount = reply.getNumErrors(); - uint32_t *errorCodes = ret.AddInt32Array(errorCount); - FRT_StringValue *errorMessages = ret.AddStringArray(errorCount); - FRT_StringValue *errorServices = ret.AddStringArray(errorCount); - for (uint32_t i = 0; i < errorCount; ++i) { - errorCodes[i] = reply.getError(i).getCode(); - ret.SetString(errorMessages + i, reply.getError(i).getMessage().c_str()); - ret.SetString(errorServices + i, reply.getError(i).getService().c_str()); - } - - ret.AddString(reply.getProtocol().c_str()); - ret.AddData(std::move(payload.payload()), payload.size()); - if (reply.getTrace().getLevel() > 0) { - ret.AddString(reply.getTrace().encode().c_str()); - } else { - ret.AddString(""); - } -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h deleted file mode 100644 index 7010df9dde5..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "rpcsend.h" - -namespace mbus { - -class RPCSendV1 : public RPCSend { -public: - static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons); -private: - void build(FRT_ReflectionBuilder & builder) override; - const char * getReturnSpec() const override; - std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; - void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, duration timeRemaining) const override; - - std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, - Error & error, vespalib::Trace & trace) const override; - void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override; -}; - -} // namespace mbus diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp index b0c5545f3ac..f70ffcc2655 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp @@ -10,7 +10,6 @@ #include <vespa/messagebus/routing/routingtable.h> #include <vespa/messagebus/routing/routedirective.h> #include <vespa/messagebus/rpcmessagebus.h> -#include <vespa/messagebus/network/rpcsendv1.h> #include <vespa/messagebus/network/rpcsendv2.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/config/common/exceptions.h> @@ -19,8 +18,10 @@ #include <vespa/fnet/frt/supervisor.h> using config::ConfigGetter; +using config::InvalidConfigException; using messagebus::MessagebusConfig; using document::DocumentTypeRepo; +using namespace vespalib::make_string_short; namespace vesparoute { @@ -60,13 +61,11 @@ Application::main(int argc, char **argv) // _P_A_R_A_N_O_I_A_ mbus::RoutingTable::SP table = _mbus->getRoutingTable(_params.getProtocol()); if ( ! table) { - throw config::InvalidConfigException(vespalib::make_string("There is no routing table for protocol '%s'.", - _params.getProtocol().c_str())); + throw InvalidConfigException(fmt("There is no routing table for protocol '%s'.", _params.getProtocol().c_str())); } for (const std::string & hop : _params.getHops()) { if (table->getHop(hop) == NULL) { - throw config::InvalidConfigException(vespalib::make_string("There is no hop named '%s' for protocol '%s'.", - hop.c_str(), _params.getProtocol().c_str())); + throw InvalidConfigException(fmt("There is no hop named '%s' for protocol '%s'.", hop.c_str(), _params.getProtocol().c_str())); } } @@ -109,7 +108,7 @@ Application::parseArgs(int argc, char **argv) if (++arg < argc) { _params.setDocumentTypesConfigId(argv[arg]); } else { - throw config::InvalidConfigException("Missing value for parameter 'documenttypesconfigid'."); + throw InvalidConfigException("Missing value for parameter 'documenttypesconfigid'."); } } else if (strcasecmp(argv[arg], "--dump") == 0) { _params.setDump(true); @@ -121,7 +120,7 @@ Application::parseArgs(int argc, char **argv) if (++arg < argc) { _params.getHops().push_back(argv[arg]); } else { - throw config::InvalidConfigException("Missing value for parameter 'hop'."); + throw InvalidConfigException("Missing value for parameter 'hop'."); } } else if (strcasecmp(argv[arg], "--hops") == 0) { _params.setListHops(true); @@ -129,25 +128,25 @@ Application::parseArgs(int argc, char **argv) if (++arg < argc) { _params.getRPCNetworkParams().setIdentity(mbus::Identity(argv[arg])); } else { - throw config::InvalidConfigException("Missing value for parameter 'identity'."); + throw InvalidConfigException("Missing value for parameter 'identity'."); } } else if (strcasecmp(argv[arg], "--listenport") == 0) { if (++arg < argc) { _params.getRPCNetworkParams().setListenPort(atoi(argv[arg])); } else { - throw config::InvalidConfigException("Missing value for parameter 'listenport'."); + throw InvalidConfigException("Missing value for parameter 'listenport'."); } } else if (strcasecmp(argv[arg], "--protocol") == 0) { if (++arg < argc) { _params.setProtocol(argv[arg]); } else { - throw config::InvalidConfigException("Missing value for parameter 'protocol'."); + throw InvalidConfigException("Missing value for parameter 'protocol'."); } } else if (strcasecmp(argv[arg], "--route") == 0) { if (++arg < argc) { _params.getRoutes().push_back(argv[arg]); } else { - throw config::InvalidConfigException("Missing value for parameter 'route'."); + throw InvalidConfigException("Missing value for parameter 'route'."); } } else if (strcasecmp(argv[arg], "--routes") == 0) { _params.setListRoutes(true); @@ -155,7 +154,7 @@ Application::parseArgs(int argc, char **argv) if (++arg < argc) { _params.setRoutingConfigId(argv[arg]); } else { - throw config::InvalidConfigException("Missing value for parameter 'routingconfigid'."); + throw InvalidConfigException("Missing value for parameter 'routingconfigid'."); } } else if (strcasecmp(argv[arg], "--services") == 0) { _params.setListServices(true); @@ -163,12 +162,12 @@ Application::parseArgs(int argc, char **argv) if (++arg < argc) { _params.setSlobrokId(argv[arg]); } else { - throw config::InvalidConfigException("Missing value for parameter 'slobrokconfigid'."); + throw InvalidConfigException("Missing value for parameter 'slobrokconfigid'."); } } else if (strcasecmp(argv[arg], "--verify") == 0) { _params.setVerify(true); } else { - throw config::InvalidConfigException(vespalib::make_string("Unknown option '%s'.", argv[arg])); + throw InvalidConfigException(fmt("Unknown option '%s'.", argv[arg])); } } return true; @@ -206,7 +205,7 @@ Application::verifyRoute(const mbus::Route &route, std::set<std::string> &errors for (std::set<std::string>::iterator err = hopErrors.begin(); err != hopErrors.end(); ++err) { - errors.insert(vespalib::make_string("for hop '%s', %s", str.c_str(), err->c_str())); + errors.insert(fmt("for hop '%s', %s", str.c_str(), err->c_str())); } } } @@ -237,7 +236,7 @@ Application::verifyHop(const mbus::HopBlueprint &hop, std::set<std::string> &err if (hop.getDirective(0)->getType() == mbus::IHopDirective::TYPE_ROUTE) { const mbus::RouteDirective &dir = static_cast<const mbus::RouteDirective &>(*hop.getDirective(0)); if (table.getRoute(dir.getName()) == nullptr) { - errors.insert(vespalib::make_string("route '%s' not found", dir.getName().c_str())); + errors.insert(fmt("route '%s' not found", dir.getName().c_str())); return false; } else { return true; @@ -495,8 +494,7 @@ Application::isService(FRT_Supervisor &frt, const std::string &spec) const FRT_StringValue *retList = req->GetReturn()->GetValue(2)._string_array._pt; for (uint32_t i = 0; i < numMethods; ++i) { - if (mbus::RPCSendV1::isCompatible(methods[i]._str,argList[i]._str, retList[i]._str) || - mbus::RPCSendV2::isCompatible(methods[i]._str,argList[i]._str, retList[i]._str)) { + if (mbus::RPCSendV2::isCompatible(methods[i]._str,argList[i]._str, retList[i]._str)) { ret = true; break; } |