diff options
Diffstat (limited to 'messagebus/src')
12 files changed, 80 insertions, 73 deletions
diff --git a/messagebus/src/tests/routingspec/routingspec.cpp b/messagebus/src/tests/routingspec/routingspec.cpp index d5317dc3bb0..32d235c0a11 100644 --- a/messagebus/src/tests/routingspec/routingspec.cpp +++ b/messagebus/src/tests/routingspec/routingspec.cpp @@ -1,7 +1,4 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fastos/fastos.h> -#include <vespa/log/log.h> -LOG_SETUP("routingspec_test"); #include <vespa/config/config.h> #include <vespa/messagebus/configagent.h> @@ -9,6 +6,8 @@ LOG_SETUP("routingspec_test"); #include <vespa/messagebus/routing/routingspec.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/messagebus/config-messagebus.h> +#include <vespa/config/helper/configgetter.hpp> + using namespace mbus; using namespace messagebus; diff --git a/messagebus/src/vespa/messagebus/context.h b/messagebus/src/vespa/messagebus/context.h index 087d41f8b80..359f32a35ec 100644 --- a/messagebus/src/vespa/messagebus/context.h +++ b/messagebus/src/vespa/messagebus/context.h @@ -2,8 +2,8 @@ #pragma once -#include <string.h> -#include <stdint.h> +#include <cstring> +#include <cstdint> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index c9067ef0aa4..f5a30c86d45 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -1,14 +1,13 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/messagebus/routing/routingnode.h> -#include <vespa/messagebus/routing/routingspec.h> -#include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/vstringfmt.h> #include "messagebus.h" -#include "imessagehandler.h" +#include "messenger.h" #include "emptyreply.h" #include "errorcode.h" #include "sendproxy.h" +#include "protocolrepository.h" +#include <vespa/messagebus/network/inetwork.h> +#include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> LOG_SETUP(".messagebus"); @@ -61,9 +60,7 @@ public: _msn(msn), _done(done), _gate(gate) - { - // empty - } + { } ~ShutdownTask() { _gate.countDown(); @@ -88,8 +85,8 @@ MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) : _lock("mbus::MessageBus::_lock", false), _routingTables(), _sessions(), - _protocolRepository(), - _msn(), + _protocolRepository(std::make_unique<ProtocolRepository>()), + _msn(std::make_unique<Messenger>()), _resender(), _maxPendingCount(0), _maxPendingSize(0), @@ -111,8 +108,8 @@ MessageBus::MessageBus(INetwork &net, const MessageBusParams ¶ms) : _lock("mbus::MessageBus::_lock", false), _routingTables(), _sessions(), - _protocolRepository(), - _msn(), + _protocolRepository(std::make_unique<ProtocolRepository>()), + _msn(std::make_unique<Messenger>()), _resender(), _maxPendingCount(params.getMaxPendingCount()), _maxPendingSize(params.getMaxPendingSize()), @@ -126,14 +123,14 @@ MessageBus::~MessageBus() { // all sessions must have been destroyed prior to this, // so no more traffic from clients - _msn.discardRecurrentTasks(); // no more traffic from recurrent tasks + _msn->discardRecurrentTasks(); // no more traffic from recurrent tasks _network.shutdown(); // no more traffic from network bool done = false; while (!done) { vespalib::Gate gate; - Messenger::ITask::UP task(new ShutdownTask(_network, _msn, done, gate)); - _msn.enqueue(std::move(task)); + Messenger::ITask::UP task(new ShutdownTask(_network, *_msn, done, gate)); + _msn->enqueue(std::move(task)); gate.await(); } } @@ -143,7 +140,7 @@ MessageBus::setup(const MessageBusParams ¶ms) { // Add all known protocols to the repository. for (uint32_t i = 0, len = params.getNumProtocols(); i < len; ++i) { - _protocolRepository.putProtocol(params.getProtocol(i)); + _protocolRepository->putProtocol(params.getProtocol(i)); } // Attach and start network. @@ -161,9 +158,9 @@ MessageBus::setup(const MessageBusParams ¶ms) _resender.reset(new Resender(retryPolicy)); Messenger::ITask::UP task(new ResenderTask(*_resender)); - _msn.addRecurrentTask(std::move(task)); + _msn->addRecurrentTask(std::move(task)); } - if (!_msn.start()) { + if (!_msn->start()) { throw vespalib::NetworkSetupFailureException("Failed to start messenger."); } } @@ -260,13 +257,13 @@ MessageBus::getRoutingPolicy(const string &protocolName, const string &policyName, const string &policyParam) { - return _protocolRepository.getRoutingPolicy(protocolName, policyName, policyParam); + return _protocolRepository->getRoutingPolicy(protocolName, policyName, policyParam); } void MessageBus::sync() { - _msn.sync(); + _msn->sync(); _network.sync(); // should not be necessary, as msn is intermediate } @@ -279,7 +276,7 @@ MessageBus::handleMessage(Message::UP msg) return; } SendProxy &proxy = *(new SendProxy(*this, _network, _resender.get())); // deletes self - _msn.deliverMessage(std::move(msg), proxy); + _msn->deliverMessage(std::move(msg), proxy); } bool @@ -301,20 +298,20 @@ MessageBus::setupRouting(const RoutingSpec &spec) LockGuard guard(_lock); std::swap(_routingTables, rtm); } - _protocolRepository.clearPolicyCache(); + _protocolRepository->clearPolicyCache(); return true; } IProtocol::SP MessageBus::getProtocol(const string &name) { - return _protocolRepository.getProtocol(name); + return _protocolRepository->getProtocol(name); } IProtocol::SP MessageBus::putProtocol(const IProtocol::SP & protocol) { - return _protocolRepository.putProtocol(protocol); + return _protocolRepository->putProtocol(protocol); } bool @@ -373,16 +370,16 @@ MessageBus::deliverMessage(Message::UP msg, const string &session) } if (msgHandler == NULL) { deliverError(std::move(msg), ErrorCode::UNKNOWN_SESSION, - vespalib::make_vespa_string( + vespalib::make_string( "Session '%s' does not exist.", session.c_str())); } else if (!checkPending(*msg)) { deliverError(std::move(msg), ErrorCode::SESSION_BUSY, - vespalib::make_vespa_string( + vespalib::make_string( "Session '%s' is busy, try again later.", session.c_str())); } else { - _msn.deliverMessage(std::move(msg), *msgHandler); + _msn->deliverMessage(std::move(msg), *msgHandler); } } @@ -400,7 +397,7 @@ MessageBus::deliverError(Message::UP msg, uint32_t errCode, const string &errMsg void MessageBus::deliverReply(Reply::UP reply, IReplyHandler &handler) { - _msn.deliverReply(std::move(reply), handler); + _msn->deliverReply(std::move(reply), handler); } const string diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h index 483915eee25..bc4a08123a4 100644 --- a/messagebus/src/vespa/messagebus/messagebus.h +++ b/messagebus/src/vespa/messagebus/messagebus.h @@ -1,27 +1,28 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <map> -#include <vespa/messagebus/network/inetworkowner.h> -#include <vespa/messagebus/routing/resender.h> -#include <vespa/messagebus/routing/routingspec.h> -#include <vespa/messagebus/routing/routingtable.h> -#include <vespa/vespalib/util/sync.h> #include "destinationsession.h" #include "iconfighandler.h" #include "idiscardhandler.h" #include "intermediatesession.h" #include "messagebusparams.h" -#include "messenger.h" #include "protocolset.h" -#include "protocolrepository.h" #include "sourcesession.h" +#include <vespa/messagebus/network/inetworkowner.h> +#include <vespa/messagebus/routing/routingspec.h> +#include <vespa/vespalib/util/sync.h> +#include <map> #include <string> #include <atomic> namespace mbus { class SendProxy; +class Messenger; +class Resender; +class INetwork; +class RoutingTable; +class ProtocolRepository; /** * A MessageBus object combined with an INetwork implementation makes up the central part of a messagebus setup. It is @@ -35,17 +36,18 @@ class MessageBus : public IMessageHandler, public IReplyHandler { private: - INetwork &_network; - vespalib::Lock _lock; - std::map<string, RoutingTable::SP> _routingTables; - std::map<string, IMessageHandler*> _sessions; - ProtocolRepository _protocolRepository; - Messenger _msn; - Resender::UP _resender; - std::atomic<uint32_t> _maxPendingCount; - std::atomic<uint32_t> _maxPendingSize; - std::atomic<uint32_t> _pendingCount; - std::atomic<uint32_t> _pendingSize; + using RoutingTableSP = std::shared_ptr<RoutingTable>; + INetwork &_network; + vespalib::Lock _lock; + std::map<string, RoutingTableSP> _routingTables; + std::map<string, IMessageHandler*> _sessions; + std::unique_ptr<ProtocolRepository> _protocolRepository; + std::unique_ptr<Messenger> _msn; + std::unique_ptr<Resender> _resender; + std::atomic<uint32_t> _maxPendingCount; + std::atomic<uint32_t> _maxPendingSize; + std::atomic<uint32_t> _pendingCount; + std::atomic<uint32_t> _pendingSize; /** * This method performs the common constructor tasks. @@ -185,7 +187,7 @@ public: * @return shared pointer to routing table * @param protocol the protocol name **/ - RoutingTable::SP getRoutingTable(const string &protocol); + RoutingTableSP getRoutingTable(const string &protocol); /** * Returns a routing policy that corresponds to the argument protocol name, policy name and policy parameter. This @@ -279,7 +281,7 @@ public: * * @return The underlying {@link Messenger} object. */ - Messenger & getMessenger() { return _msn; } + Messenger & getMessenger() { return *_msn; } // Implements IReplyHandler. void handleReply(Reply::UP reply); diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h index b218a0689a2..5070f3c6ca3 100644 --- a/messagebus/src/vespa/messagebus/network/inetwork.h +++ b/messagebus/src/vespa/messagebus/network/inetwork.h @@ -2,12 +2,13 @@ #pragma once #include <memory> -#include <vespa/messagebus/routing/routingnode.h> #include <vespa/slobrok/imirrorapi.h> #include "inetworkowner.h" namespace mbus { +class RoutingNode; + /** * This interface is used to hide away the implementation details of the network * code from the rest of the messagebus implementation. The methods defined in diff --git a/messagebus/src/vespa/messagebus/network/inetworkowner.h b/messagebus/src/vespa/messagebus/network/inetworkowner.h index 493f121783f..8205b04817c 100644 --- a/messagebus/src/vespa/messagebus/network/inetworkowner.h +++ b/messagebus/src/vespa/messagebus/network/inetworkowner.h @@ -2,11 +2,14 @@ #pragma once #include <memory> -#include <vespa/messagebus/iprotocol.h> -#include <vespa/messagebus/ireplyhandler.h> +#include <vespa/messagebus/common.h> namespace mbus { + class Reply; + class IProtocol; + class IReplyHandler; + class Message; /** * A network owner is the object that instantiates and uses a network. The API to send messages * across the network is part of the Network interface, whereas this interface exposes the required @@ -26,7 +29,7 @@ public: * @param name The name of the protocol to return. * @return The named protocol. */ - virtual IProtocol::SP getProtocol(const string &name) = 0; + virtual std::shared_ptr<IProtocol> getProtocol(const string &name) = 0; /** * All messages that arrive in the network layer is passed to its owner through this function. @@ -34,7 +37,7 @@ public: * @param message The message that just arrived from the network. * @param session The name of the session that is the recipient of the request. */ - virtual void deliverMessage(Message::UP message, const string &session) = 0; + virtual void deliverMessage(std::unique_ptr<Message> message, const string &session) = 0; /** * All replies that arrive in the network layer is passed through this to unentangle it from the network thread. @@ -42,7 +45,7 @@ public: * @param reply The reply that just arrived from the network. * @param handler The handler that is to receive the reply. */ - virtual void deliverReply(Reply::UP reply, IReplyHandler &handler) = 0; + virtual void deliverReply(std::unique_ptr<Reply> reply, IReplyHandler &handler) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index b0101753452..a5caaa127c3 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -1,12 +1,11 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "inetworkowner.h" #include "rpcnetwork.h" -#include "rpcsendv1.h" -#include "rpcservice.h" #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/iprotocol.h> #include <vespa/messagebus/tracelevel.h> +#include <vespa/messagebus/routing/routingnode.h> #include <vespa/slobrok/sbregister.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/log/log.h> diff --git a/messagebus/src/vespa/messagebus/routing/routingcontext.cpp b/messagebus/src/vespa/messagebus/routing/routingcontext.cpp index 8732ed4fc08..f671890c0c1 100644 --- a/messagebus/src/vespa/messagebus/routing/routingcontext.cpp +++ b/messagebus/src/vespa/messagebus/routing/routingcontext.cpp @@ -1,6 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "route.h" #include "routingnode.h" +#include "policydirective.h" +#include <vespa/messagebus/network/inetwork.h> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/routing/routingcontext.h b/messagebus/src/vespa/messagebus/routing/routingcontext.h index 9edcd5aa014..e2e2dcf0cca 100644 --- a/messagebus/src/vespa/messagebus/routing/routingcontext.h +++ b/messagebus/src/vespa/messagebus/routing/routingcontext.h @@ -1,17 +1,18 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "routingnodeiterator.h" #include <vespa/messagebus/context.h> -#include <vespa/messagebus/messagebus.h> #include <vespa/slobrok/imirrorapi.h> #include <set> -#include "hop.h" -#include "policydirective.h" -#include "routingnodeiterator.h" namespace mbus { class RoutingNode; +class PolicyDirective; +class MessageBus; +class Message; +class Error; /** * This context object is what is seen by {@link RoutingPolicy} when doing both select() and merge(). It @@ -192,7 +193,7 @@ public: * @param reply The reply to set. * @return This, to allow chaining. */ - RoutingContext &setReply(Reply::UP reply); + RoutingContext &setReply(std::unique_ptr<Reply> reply); /** * This is a convenience method to call {@link #setError(Error)}. diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.cpp b/messagebus/src/vespa/messagebus/routing/routingnode.cpp index 7bfb31c2e68..00922d76759 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnode.cpp +++ b/messagebus/src/vespa/messagebus/routing/routingnode.cpp @@ -1,14 +1,15 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "routingnode.h" #include "errordirective.h" -#include "policydirective.h" #include "routedirective.h" +#include "routingtable.h" +#include "policydirective.h" #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/tracelevel.h> -#include <stack> #include <vespa/vespalib/util/atomic.h> -#include <vespa/vespalib/util/vstringfmt.h> +#include <vespa/messagebus/network/inetwork.h> +#include <stack> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h index 92a4b835ba1..aa854dce5ef 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnode.h +++ b/messagebus/src/vespa/messagebus/routing/routingnode.h @@ -6,7 +6,6 @@ #include <vespa/messagebus/ireplyhandler.h> #include <vespa/messagebus/message.h> #include <vespa/messagebus/messagebus.h> -#include <vespa/messagebus/network/inetwork.h> #include <vespa/messagebus/network/iserviceaddress.h> #include <vespa/messagebus/reply.h> #include <string> @@ -20,6 +19,9 @@ namespace mbus { +class HopBlueprint; +class INetwork; + /** * This class represents a node in the routing tree that is created when a route * is resolved. There will be one node per modification of the route. For every diff --git a/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h b/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h index 12ab0f63485..dad7251de64 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h +++ b/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h @@ -1,12 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/messagebus/reply.h> #include "route.h" namespace mbus { class RoutingNode; +class Reply; /** * Implements an iterator for the child routing contexts of this. Use {@link @@ -66,7 +66,7 @@ public: * * @return The reply. */ - Reply::UP removeReply(); + std::unique_ptr<Reply> removeReply(); /** * Returns the reply of the current child. |