summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-07-09 07:45:01 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-07-09 07:45:01 +0000
commit637de9a2313414da6930f703e2a63eae4637d0e6 (patch)
treee5b16eb1f9bc2bc3f138ad5e8148d3e00613a552 /messagebus
parent3af0dfbc0f011a70259424d446a8a9f1bd9b994b (diff)
Config control over what treads to skip.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/messenger/messenger.cpp17
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h2
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp22
-rw-r--r--messagebus/src/vespa/messagebus/messenger.h9
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h24
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp2
9 files changed, 52 insertions, 35 deletions
diff --git a/messagebus/src/tests/messenger/messenger.cpp b/messagebus/src/tests/messenger/messenger.cpp
index 8133d24c186..0653ee52b0e 100644
--- a/messagebus/src/tests/messenger/messenger.cpp
+++ b/messagebus/src/tests/messenger/messenger.cpp
@@ -6,8 +6,6 @@
using namespace mbus;
-TEST_SETUP(Test);
-
class ThrowException : public Messenger::ITask {
public:
void run() override {
@@ -39,20 +37,17 @@ public:
}
};
-int
-Test::Main()
-{
- TEST_INIT("messenger_test");
+TEST("messenger_test") {
- Messenger msn;
+ Messenger msn(true, true);
msn.start();
vespalib::Barrier barrier(2);
- msn.enqueue(Messenger::ITask::UP(new ThrowException()));
- msn.enqueue(Messenger::ITask::UP(new BarrierTask(barrier)));
+ msn.enqueue(std::make_unique<ThrowException>());
+ msn.enqueue(std::make_unique<BarrierTask>(barrier));
barrier.await();
ASSERT_TRUE(msn.isEmpty());
-
- TEST_DONE();
}
+
+TEST_MAIN() { TEST_RUN_ALL(); } \ No newline at end of file
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index fee9504007b..c3d6b28b318 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -83,13 +83,13 @@ public:
namespace mbus {
-MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) :
+MessageBus::MessageBus(INetwork &net, ProtocolSet protocols, bool skip_request_thread, bool skip_reply_thread) :
_network(net),
_lock(),
_routingTables(),
_sessions(),
_protocolRepository(std::make_unique<ProtocolRepository>()),
- _msn(std::make_unique<Messenger>()),
+ _msn(std::make_unique<Messenger>(skip_request_thread, skip_reply_thread)),
_resender(),
_maxPendingCount(0),
_maxPendingSize(0),
@@ -112,7 +112,7 @@ MessageBus::MessageBus(INetwork &net, const MessageBusParams &params) :
_routingTables(),
_sessions(),
_protocolRepository(std::make_unique<ProtocolRepository>()),
- _msn(std::make_unique<Messenger>()),
+ _msn(std::make_unique<Messenger>(true, true)),
_resender(),
_maxPendingCount(params.getMaxPendingCount()),
_maxPendingSize(params.getMaxPendingSize()),
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index f10898275b3..c0682967db9 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -83,7 +83,7 @@ public:
* @param network The network to associate with.
* @param protocols An array of protocols to register.
*/
- MessageBus(INetwork &net, ProtocolSet protocols);
+ MessageBus(INetwork &net, ProtocolSet protocols, bool skip_request_thread, bool skip_reply_thread);
/**
* Constructs an instance of message bus. This requires a network object that it will associate with. This
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 5313c4adcbb..4579f7dec0e 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -156,15 +156,15 @@ public:
namespace mbus {
-Messenger::Messenger() :
+Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread) :
_monitor(),
_pool(128000),
_children(),
_queue(),
- _closed(false)
-{
- // empty
-}
+ _closed(false),
+ _skip_request_thread(skip_request_thread),
+ _skip_reply_thread(skip_reply_thread)
+{}
Messenger::~Messenger()
{
@@ -246,13 +246,21 @@ Messenger::start()
void
Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler)
{
- enqueue(std::make_unique<MessageTask>(std::move(msg), handler));
+ if (_skip_request_thread) {
+ handler.handleMessage(std::move(msg));
+ } else {
+ enqueue(std::make_unique<MessageTask>(std::move(msg), handler));
+ }
}
void
Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
{
- enqueue(std::make_unique<ReplyTask>(std::move(reply), handler));
+ if (_skip_reply_thread) {
+ handler.handleReply(std::move(reply));
+ } else {
+ enqueue(std::make_unique<ReplyTask>(std::move(reply), handler));
+ }
}
void
diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h
index 62e5a14cf95..3103e9afae1 100644
--- a/messagebus/src/vespa/messagebus/messenger.h
+++ b/messagebus/src/vespa/messagebus/messenger.h
@@ -45,20 +45,19 @@ private:
std::vector<ITask*> _children;
vespalib::ArrayQueue<ITask*> _queue;
bool _closed;
+ const bool _skip_request_thread;
+ const bool _skip_reply_thread;
protected:
void Run(FastOS_ThreadInterface *thread, void *arg) override;
public:
- /**
- * Constructs a new messenger object.
- */
- Messenger();
+ Messenger(bool skip_request_thread, bool skip_reply_thread);
/**
* Frees any allocated resources. Also destroys all queued tasks.
*/
- ~Messenger();
+ ~Messenger() override;
/**
* Adds a recurrent task to this that is to be run for every iteration of
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 5bf277a8ee6..01d1cd918a7 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -18,6 +18,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_tcpNoDelay(true),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
+ _skip_request_thread(false),
+ _skip_reply_thread(false),
_connectionExpireSecs(600),
_compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 140f81c611c..ddb4df1a3a3 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -23,6 +23,8 @@ private:
bool _tcpNoDelay;
bool _dispatchOnEncode;
bool _dispatchOnDecode;
+ bool _skip_request_thread;
+ bool _skip_reply_thread;
double _connectionExpireSecs;
CompressionConfig _compressionConfig;
@@ -71,10 +73,6 @@ public:
}
/**
- *
- */
-
- /**
* Returns the port to listen to.
*
* @return The port.
@@ -190,14 +188,28 @@ public:
return *this;
}
- uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; }
+ bool getDispatchOnDecode() const { return _dispatchOnDecode; }
RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) {
_dispatchOnEncode = dispatchOnEncode;
return *this;
}
- uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
+ bool getDispatchOnEncode() const { return _dispatchOnEncode; }
+
+ RPCNetworkParams &setSkipRequestThread(bool skip_request_thread) {
+ _skip_request_thread = skip_request_thread;
+ return *this;
+ }
+
+ bool getSkipRequestThread() const { return _skip_request_thread; }
+
+ RPCNetworkParams &setSkipReplyThread(bool skip_reply_thread) {
+ _skip_reply_thread = skip_reply_thread;
+ return *this;
+ }
+
+ bool getSkipReplyThread() const { return _skip_reply_thread; }
};
}
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
index 103c21ee3aa..29b73a4e730 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpcmessagebus.h"
+#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/config/subscription/configuri.h>
namespace mbus {
@@ -24,7 +25,7 @@ RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols,
const RPCNetworkParams &rpcParams,
const config::ConfigUri &routingCfgUri) :
_net(rpcParams),
- _bus(_net, protocols),
+ _bus(_net, protocols, rpcParams.getSkipRequestThread(), rpcParams.getSkipReplyThread()),
_agent(_bus),
_subscriber(routingCfgUri.getContext())
{
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index 8e7f138b886..79d2539213d 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -25,7 +25,7 @@ TestServer::TestServer(const Identity &ident,
const Slobrok &slobrok,
IProtocol::SP protocol) :
net(RPCNetworkParams(slobrok.config()).setIdentity(ident)),
- mb(net, ProtocolSet().add(std::make_shared<SimpleProtocol>()).add(protocol))
+ mb(net, ProtocolSet().add(std::make_shared<SimpleProtocol>()).add(protocol), true, true)
{
mb.setupRouting(spec);
}