summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2016-12-20 14:41:39 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2016-12-20 15:50:29 +0100
commit51bc810507f2067ebd2646274d3cda5cb583a620 (patch)
treecbff1f6464262c2e6b3a622db93d044bab69a532 /messagebus
parent530e52e17a85836d58cac58c89b71c189c6f9873 (diff)
Further decouple some hpp files, config and the attributevector.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/routingspec/routingspec.cpp5
-rw-r--r--messagebus/src/vespa/messagebus/context.h4
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp53
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h44
-rw-r--r--messagebus/src/vespa/messagebus/network/inetwork.h3
-rw-r--r--messagebus/src/vespa/messagebus/network/inetworkowner.h13
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingcontext.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingcontext.h11
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.h4
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnodeiterator.h4
12 files changed, 80 insertions, 73 deletions
diff --git a/messagebus/src/tests/routingspec/routingspec.cpp b/messagebus/src/tests/routingspec/routingspec.cpp
index d5317dc3bb0..32d235c0a11 100644
--- a/messagebus/src/tests/routingspec/routingspec.cpp
+++ b/messagebus/src/tests/routingspec/routingspec.cpp
@@ -1,7 +1,4 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fastos/fastos.h>
-#include <vespa/log/log.h>
-LOG_SETUP("routingspec_test");
#include <vespa/config/config.h>
#include <vespa/messagebus/configagent.h>
@@ -9,6 +6,8 @@ LOG_SETUP("routingspec_test");
#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/config-messagebus.h>
+#include <vespa/config/helper/configgetter.hpp>
+
using namespace mbus;
using namespace messagebus;
diff --git a/messagebus/src/vespa/messagebus/context.h b/messagebus/src/vespa/messagebus/context.h
index 087d41f8b80..359f32a35ec 100644
--- a/messagebus/src/vespa/messagebus/context.h
+++ b/messagebus/src/vespa/messagebus/context.h
@@ -2,8 +2,8 @@
#pragma once
-#include <string.h>
-#include <stdint.h>
+#include <cstring>
+#include <cstdint>
namespace mbus {
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index c9067ef0aa4..f5a30c86d45 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -1,14 +1,13 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/routing/routingnode.h>
-#include <vespa/messagebus/routing/routingspec.h>
-#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/util/vstringfmt.h>
#include "messagebus.h"
-#include "imessagehandler.h"
+#include "messenger.h"
#include "emptyreply.h"
#include "errorcode.h"
#include "sendproxy.h"
+#include "protocolrepository.h"
+#include <vespa/messagebus/network/inetwork.h>
+#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/log.h>
LOG_SETUP(".messagebus");
@@ -61,9 +60,7 @@ public:
_msn(msn),
_done(done),
_gate(gate)
- {
- // empty
- }
+ { }
~ShutdownTask() {
_gate.countDown();
@@ -88,8 +85,8 @@ MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) :
_lock("mbus::MessageBus::_lock", false),
_routingTables(),
_sessions(),
- _protocolRepository(),
- _msn(),
+ _protocolRepository(std::make_unique<ProtocolRepository>()),
+ _msn(std::make_unique<Messenger>()),
_resender(),
_maxPendingCount(0),
_maxPendingSize(0),
@@ -111,8 +108,8 @@ MessageBus::MessageBus(INetwork &net, const MessageBusParams &params) :
_lock("mbus::MessageBus::_lock", false),
_routingTables(),
_sessions(),
- _protocolRepository(),
- _msn(),
+ _protocolRepository(std::make_unique<ProtocolRepository>()),
+ _msn(std::make_unique<Messenger>()),
_resender(),
_maxPendingCount(params.getMaxPendingCount()),
_maxPendingSize(params.getMaxPendingSize()),
@@ -126,14 +123,14 @@ MessageBus::~MessageBus()
{
// all sessions must have been destroyed prior to this,
// so no more traffic from clients
- _msn.discardRecurrentTasks(); // no more traffic from recurrent tasks
+ _msn->discardRecurrentTasks(); // no more traffic from recurrent tasks
_network.shutdown(); // no more traffic from network
bool done = false;
while (!done) {
vespalib::Gate gate;
- Messenger::ITask::UP task(new ShutdownTask(_network, _msn, done, gate));
- _msn.enqueue(std::move(task));
+ Messenger::ITask::UP task(new ShutdownTask(_network, *_msn, done, gate));
+ _msn->enqueue(std::move(task));
gate.await();
}
}
@@ -143,7 +140,7 @@ MessageBus::setup(const MessageBusParams &params)
{
// Add all known protocols to the repository.
for (uint32_t i = 0, len = params.getNumProtocols(); i < len; ++i) {
- _protocolRepository.putProtocol(params.getProtocol(i));
+ _protocolRepository->putProtocol(params.getProtocol(i));
}
// Attach and start network.
@@ -161,9 +158,9 @@ MessageBus::setup(const MessageBusParams &params)
_resender.reset(new Resender(retryPolicy));
Messenger::ITask::UP task(new ResenderTask(*_resender));
- _msn.addRecurrentTask(std::move(task));
+ _msn->addRecurrentTask(std::move(task));
}
- if (!_msn.start()) {
+ if (!_msn->start()) {
throw vespalib::NetworkSetupFailureException("Failed to start messenger.");
}
}
@@ -260,13 +257,13 @@ MessageBus::getRoutingPolicy(const string &protocolName,
const string &policyName,
const string &policyParam)
{
- return _protocolRepository.getRoutingPolicy(protocolName, policyName, policyParam);
+ return _protocolRepository->getRoutingPolicy(protocolName, policyName, policyParam);
}
void
MessageBus::sync()
{
- _msn.sync();
+ _msn->sync();
_network.sync(); // should not be necessary, as msn is intermediate
}
@@ -279,7 +276,7 @@ MessageBus::handleMessage(Message::UP msg)
return;
}
SendProxy &proxy = *(new SendProxy(*this, _network, _resender.get())); // deletes self
- _msn.deliverMessage(std::move(msg), proxy);
+ _msn->deliverMessage(std::move(msg), proxy);
}
bool
@@ -301,20 +298,20 @@ MessageBus::setupRouting(const RoutingSpec &spec)
LockGuard guard(_lock);
std::swap(_routingTables, rtm);
}
- _protocolRepository.clearPolicyCache();
+ _protocolRepository->clearPolicyCache();
return true;
}
IProtocol::SP
MessageBus::getProtocol(const string &name)
{
- return _protocolRepository.getProtocol(name);
+ return _protocolRepository->getProtocol(name);
}
IProtocol::SP
MessageBus::putProtocol(const IProtocol::SP & protocol)
{
- return _protocolRepository.putProtocol(protocol);
+ return _protocolRepository->putProtocol(protocol);
}
bool
@@ -373,16 +370,16 @@ MessageBus::deliverMessage(Message::UP msg, const string &session)
}
if (msgHandler == NULL) {
deliverError(std::move(msg), ErrorCode::UNKNOWN_SESSION,
- vespalib::make_vespa_string(
+ vespalib::make_string(
"Session '%s' does not exist.",
session.c_str()));
} else if (!checkPending(*msg)) {
deliverError(std::move(msg), ErrorCode::SESSION_BUSY,
- vespalib::make_vespa_string(
+ vespalib::make_string(
"Session '%s' is busy, try again later.",
session.c_str()));
} else {
- _msn.deliverMessage(std::move(msg), *msgHandler);
+ _msn->deliverMessage(std::move(msg), *msgHandler);
}
}
@@ -400,7 +397,7 @@ MessageBus::deliverError(Message::UP msg, uint32_t errCode, const string &errMsg
void
MessageBus::deliverReply(Reply::UP reply, IReplyHandler &handler)
{
- _msn.deliverReply(std::move(reply), handler);
+ _msn->deliverReply(std::move(reply), handler);
}
const string
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index 483915eee25..bc4a08123a4 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -1,27 +1,28 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <map>
-#include <vespa/messagebus/network/inetworkowner.h>
-#include <vespa/messagebus/routing/resender.h>
-#include <vespa/messagebus/routing/routingspec.h>
-#include <vespa/messagebus/routing/routingtable.h>
-#include <vespa/vespalib/util/sync.h>
#include "destinationsession.h"
#include "iconfighandler.h"
#include "idiscardhandler.h"
#include "intermediatesession.h"
#include "messagebusparams.h"
-#include "messenger.h"
#include "protocolset.h"
-#include "protocolrepository.h"
#include "sourcesession.h"
+#include <vespa/messagebus/network/inetworkowner.h>
+#include <vespa/messagebus/routing/routingspec.h>
+#include <vespa/vespalib/util/sync.h>
+#include <map>
#include <string>
#include <atomic>
namespace mbus {
class SendProxy;
+class Messenger;
+class Resender;
+class INetwork;
+class RoutingTable;
+class ProtocolRepository;
/**
* A MessageBus object combined with an INetwork implementation makes up the central part of a messagebus setup. It is
@@ -35,17 +36,18 @@ class MessageBus : public IMessageHandler,
public IReplyHandler
{
private:
- INetwork &_network;
- vespalib::Lock _lock;
- std::map<string, RoutingTable::SP> _routingTables;
- std::map<string, IMessageHandler*> _sessions;
- ProtocolRepository _protocolRepository;
- Messenger _msn;
- Resender::UP _resender;
- std::atomic<uint32_t> _maxPendingCount;
- std::atomic<uint32_t> _maxPendingSize;
- std::atomic<uint32_t> _pendingCount;
- std::atomic<uint32_t> _pendingSize;
+ using RoutingTableSP = std::shared_ptr<RoutingTable>;
+ INetwork &_network;
+ vespalib::Lock _lock;
+ std::map<string, RoutingTableSP> _routingTables;
+ std::map<string, IMessageHandler*> _sessions;
+ std::unique_ptr<ProtocolRepository> _protocolRepository;
+ std::unique_ptr<Messenger> _msn;
+ std::unique_ptr<Resender> _resender;
+ std::atomic<uint32_t> _maxPendingCount;
+ std::atomic<uint32_t> _maxPendingSize;
+ std::atomic<uint32_t> _pendingCount;
+ std::atomic<uint32_t> _pendingSize;
/**
* This method performs the common constructor tasks.
@@ -185,7 +187,7 @@ public:
* @return shared pointer to routing table
* @param protocol the protocol name
**/
- RoutingTable::SP getRoutingTable(const string &protocol);
+ RoutingTableSP getRoutingTable(const string &protocol);
/**
* Returns a routing policy that corresponds to the argument protocol name, policy name and policy parameter. This
@@ -279,7 +281,7 @@ public:
*
* @return The underlying {@link Messenger} object.
*/
- Messenger & getMessenger() { return _msn; }
+ Messenger & getMessenger() { return *_msn; }
// Implements IReplyHandler.
void handleReply(Reply::UP reply);
diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h
index b218a0689a2..5070f3c6ca3 100644
--- a/messagebus/src/vespa/messagebus/network/inetwork.h
+++ b/messagebus/src/vespa/messagebus/network/inetwork.h
@@ -2,12 +2,13 @@
#pragma once
#include <memory>
-#include <vespa/messagebus/routing/routingnode.h>
#include <vespa/slobrok/imirrorapi.h>
#include "inetworkowner.h"
namespace mbus {
+class RoutingNode;
+
/**
* This interface is used to hide away the implementation details of the network
* code from the rest of the messagebus implementation. The methods defined in
diff --git a/messagebus/src/vespa/messagebus/network/inetworkowner.h b/messagebus/src/vespa/messagebus/network/inetworkowner.h
index 493f121783f..8205b04817c 100644
--- a/messagebus/src/vespa/messagebus/network/inetworkowner.h
+++ b/messagebus/src/vespa/messagebus/network/inetworkowner.h
@@ -2,11 +2,14 @@
#pragma once
#include <memory>
-#include <vespa/messagebus/iprotocol.h>
-#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/messagebus/common.h>
namespace mbus {
+ class Reply;
+ class IProtocol;
+ class IReplyHandler;
+ class Message;
/**
* A network owner is the object that instantiates and uses a network. The API to send messages
* across the network is part of the Network interface, whereas this interface exposes the required
@@ -26,7 +29,7 @@ public:
* @param name The name of the protocol to return.
* @return The named protocol.
*/
- virtual IProtocol::SP getProtocol(const string &name) = 0;
+ virtual std::shared_ptr<IProtocol> getProtocol(const string &name) = 0;
/**
* All messages that arrive in the network layer is passed to its owner through this function.
@@ -34,7 +37,7 @@ public:
* @param message The message that just arrived from the network.
* @param session The name of the session that is the recipient of the request.
*/
- virtual void deliverMessage(Message::UP message, const string &session) = 0;
+ virtual void deliverMessage(std::unique_ptr<Message> message, const string &session) = 0;
/**
* All replies that arrive in the network layer is passed through this to unentangle it from the network thread.
@@ -42,7 +45,7 @@ public:
* @param reply The reply that just arrived from the network.
* @param handler The handler that is to receive the reply.
*/
- virtual void deliverReply(Reply::UP reply, IReplyHandler &handler) = 0;
+ virtual void deliverReply(std::unique_ptr<Reply> reply, IReplyHandler &handler) = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index b0101753452..a5caaa127c3 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -1,12 +1,11 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "inetworkowner.h"
#include "rpcnetwork.h"
-#include "rpcsendv1.h"
-#include "rpcservice.h"
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/iprotocol.h>
#include <vespa/messagebus/tracelevel.h>
+#include <vespa/messagebus/routing/routingnode.h>
#include <vespa/slobrok/sbregister.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/log/log.h>
diff --git a/messagebus/src/vespa/messagebus/routing/routingcontext.cpp b/messagebus/src/vespa/messagebus/routing/routingcontext.cpp
index 8732ed4fc08..f671890c0c1 100644
--- a/messagebus/src/vespa/messagebus/routing/routingcontext.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routingcontext.cpp
@@ -1,6 +1,8 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "route.h"
#include "routingnode.h"
+#include "policydirective.h"
+#include <vespa/messagebus/network/inetwork.h>
namespace mbus {
diff --git a/messagebus/src/vespa/messagebus/routing/routingcontext.h b/messagebus/src/vespa/messagebus/routing/routingcontext.h
index 9edcd5aa014..e2e2dcf0cca 100644
--- a/messagebus/src/vespa/messagebus/routing/routingcontext.h
+++ b/messagebus/src/vespa/messagebus/routing/routingcontext.h
@@ -1,17 +1,18 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "routingnodeiterator.h"
#include <vespa/messagebus/context.h>
-#include <vespa/messagebus/messagebus.h>
#include <vespa/slobrok/imirrorapi.h>
#include <set>
-#include "hop.h"
-#include "policydirective.h"
-#include "routingnodeiterator.h"
namespace mbus {
class RoutingNode;
+class PolicyDirective;
+class MessageBus;
+class Message;
+class Error;
/**
* This context object is what is seen by {@link RoutingPolicy} when doing both select() and merge(). It
@@ -192,7 +193,7 @@ public:
* @param reply The reply to set.
* @return This, to allow chaining.
*/
- RoutingContext &setReply(Reply::UP reply);
+ RoutingContext &setReply(std::unique_ptr<Reply> reply);
/**
* This is a convenience method to call {@link #setError(Error)}.
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.cpp b/messagebus/src/vespa/messagebus/routing/routingnode.cpp
index 7bfb31c2e68..00922d76759 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.cpp
@@ -1,14 +1,15 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "routingnode.h"
#include "errordirective.h"
-#include "policydirective.h"
#include "routedirective.h"
+#include "routingtable.h"
+#include "policydirective.h"
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/tracelevel.h>
-#include <stack>
#include <vespa/vespalib/util/atomic.h>
-#include <vespa/vespalib/util/vstringfmt.h>
+#include <vespa/messagebus/network/inetwork.h>
+#include <stack>
namespace mbus {
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h
index 92a4b835ba1..aa854dce5ef 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.h
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.h
@@ -6,7 +6,6 @@
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/messagebus/message.h>
#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/network/inetwork.h>
#include <vespa/messagebus/network/iserviceaddress.h>
#include <vespa/messagebus/reply.h>
#include <string>
@@ -20,6 +19,9 @@
namespace mbus {
+class HopBlueprint;
+class INetwork;
+
/**
* This class represents a node in the routing tree that is created when a route
* is resolved. There will be one node per modification of the route. For every
diff --git a/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h b/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h
index 12ab0f63485..dad7251de64 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h
+++ b/messagebus/src/vespa/messagebus/routing/routingnodeiterator.h
@@ -1,12 +1,12 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/messagebus/reply.h>
#include "route.h"
namespace mbus {
class RoutingNode;
+class Reply;
/**
* Implements an iterator for the child routing contexts of this. Use {@link
@@ -66,7 +66,7 @@ public:
*
* @return The reply.
*/
- Reply::UP removeReply();
+ std::unique_ptr<Reply> removeReply();
/**
* Returns the reply of the current child.