summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-06-29 07:58:08 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-06-29 07:58:08 +0000
commite60ef2ca182ac32903e5c2e379bf6eea9fe77a4a (patch)
tree303d9948b27baf82ead1823d0033a62c1ae66381
parent1dff254213b5977d1196e0160de82a582a623a72 (diff)
Always skip messenger and communicationmanager when you can.
-rw-r--r--messagebus/src/tests/messenger/messenger.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h2
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp18
-rw-r--r--messagebus/src/vespa/messagebus/messenger.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h14
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp24
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
10 files changed, 17 insertions, 61 deletions
diff --git a/messagebus/src/tests/messenger/messenger.cpp b/messagebus/src/tests/messenger/messenger.cpp
index 1ee34b386ae..acd4406a7dc 100644
--- a/messagebus/src/tests/messenger/messenger.cpp
+++ b/messagebus/src/tests/messenger/messenger.cpp
@@ -39,7 +39,7 @@ public:
TEST("messenger_test") {
- Messenger msn(true, true);
+ Messenger msn;
msn.start();
vespalib::Barrier barrier(2);
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index b4b29b84793..97be5955e9d 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -82,13 +82,13 @@ public:
namespace mbus {
-MessageBus::MessageBus(INetwork &net, ProtocolSet protocols, bool skip_request_thread, bool skip_reply_thread) :
+MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) :
_network(net),
_lock(),
_routingTables(),
_sessions(),
_protocolRepository(std::make_unique<ProtocolRepository>()),
- _msn(std::make_unique<Messenger>(skip_request_thread, skip_reply_thread)),
+ _msn(std::make_unique<Messenger>()),
_resender(),
_maxPendingCount(0),
_maxPendingSize(0),
@@ -111,7 +111,7 @@ MessageBus::MessageBus(INetwork &net, const MessageBusParams &params) :
_routingTables(),
_sessions(),
_protocolRepository(std::make_unique<ProtocolRepository>()),
- _msn(std::make_unique<Messenger>(true, true)),
+ _msn(std::make_unique<Messenger>()),
_resender(),
_maxPendingCount(params.getMaxPendingCount()),
_maxPendingSize(params.getMaxPendingSize()),
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index d2e82835f3e..d270a0f3491 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -82,7 +82,7 @@ public:
* @param network The network to associate with.
* @param protocols An array of protocols to register.
*/
- MessageBus(INetwork &net, ProtocolSet protocols, bool skip_request_thread, bool skip_reply_thread);
+ MessageBus(INetwork &net, ProtocolSet protocols);
/**
* Constructs an instance of message bus. This requires a network object that it will associate with. This
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 13ca5317148..1423876e95b 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -155,14 +155,12 @@ public:
namespace mbus {
-Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread)
+Messenger::Messenger()
: _lock(),
_pool(128000),
_children(),
_queue(),
- _closed(false),
- _skip_request_thread(skip_request_thread),
- _skip_reply_thread(skip_reply_thread)
+ _closed(false)
{}
Messenger::~Messenger()
@@ -246,21 +244,13 @@ Messenger::start()
void
Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler)
{
- if (_skip_request_thread) {
- handler.handleMessage(std::move(msg));
- } else {
- enqueue(std::make_unique<MessageTask>(std::move(msg), handler));
- }
+ handler.handleMessage(std::move(msg));
}
void
Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
{
- if (_skip_reply_thread) {
- handler.handleReply(std::move(reply));
- } else {
- enqueue(std::make_unique<ReplyTask>(std::move(reply), handler));
- }
+ handler.handleReply(std::move(reply));
}
void
diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h
index be4dbdb10d8..0d36e6006cb 100644
--- a/messagebus/src/vespa/messagebus/messenger.h
+++ b/messagebus/src/vespa/messagebus/messenger.h
@@ -42,17 +42,15 @@ private:
mutable std::mutex _lock;
std::condition_variable _cond;
FastOS_ThreadPool _pool;
- std::vector<ITask*> _children;
+ std::vector<ITask*> _children;
vespalib::ArrayQueue<ITask*> _queue;
bool _closed;
- const bool _skip_request_thread;
- const bool _skip_reply_thread;
protected:
void Run(FastOS_ThreadInterface *thread, void *arg) override;
public:
- Messenger(bool skip_request_thread, bool skip_reply_thread);
+ Messenger();
/**
* Frees any allocated resources. Also destroys all queued tasks.
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 01834074e6f..16b8b9b1570 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -218,20 +218,6 @@ public:
}
bool getDispatchOnEncode() const { return _dispatchOnEncode; }
-
- RPCNetworkParams &setSkipRequestThread(bool skip_request_thread) {
- _skip_request_thread = skip_request_thread;
- return *this;
- }
-
- bool getSkipRequestThread() const { return _skip_request_thread; }
-
- RPCNetworkParams &setSkipReplyThread(bool skip_reply_thread) {
- _skip_reply_thread = skip_reply_thread;
- return *this;
- }
-
- bool getSkipReplyThread() const { return _skip_reply_thread; }
};
}
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
index a79171f5af2..0e4b6673d20 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
@@ -25,7 +25,7 @@ RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols,
const RPCNetworkParams &rpcParams,
const config::ConfigUri &routingCfgUri) :
_net(rpcParams),
- _bus(_net, protocols, rpcParams.getSkipRequestThread(), rpcParams.getSkipReplyThread()),
+ _bus(_net, protocols),
_agent(_bus),
_subscriber(routingCfgUri.getContext())
{
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index 80d9eaefe29..d289c372fda 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -25,7 +25,7 @@ TestServer::TestServer(const Identity &ident,
const Slobrok &slobrok,
IProtocol::SP protocol) :
net(RPCNetworkParams(slobrok.config()).setIdentity(ident)),
- mb(net, ProtocolSet().add(std::make_shared<SimpleProtocol>()).add(protocol), true, true)
+ mb(net, ProtocolSet().add(std::make_shared<SimpleProtocol>()).add(protocol))
{
mb.setupRouting(spec);
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 975e9361072..5a3ebebcd9e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -49,7 +49,7 @@ void
CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply)
{
assert(reply);
- enqueue_or_process(reply);
+ process(reply);
}
namespace {
@@ -104,7 +104,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTrace(docMsgPtr->steal_trace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr)));
- enqueue_or_process(std::move(cmd));
+ process(std::move(cmd));
} else if (protocolName == mbusprot::StorageProtocol::NAME) {
std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release()));
@@ -116,7 +116,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTrace(storMsgPtr->steal_trace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
- enqueue_or_process(std::move(cmd));
+ process(std::move(cmd));
} else {
LOGBM(warning, "Received unsupported message type %d for protocol '%s'",
msg->getType(), msg->getProtocol().c_str());
@@ -268,8 +268,7 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_configUri(configUri),
_closed(false),
_docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()),
- _thread(),
- _skip_thread(false)
+ _thread()
{
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
_component.registerMetric(_metrics);
@@ -372,7 +371,6 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig
void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config)
{
// Only allow dynamic (live) reconfiguration of message bus limits.
- _skip_thread = config->skipThread;
if (_mbus) {
configureMessageBusLimits(*config);
if (_mbus->getRPCNetwork().getPort() != config->mbusport) {
@@ -411,8 +409,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str());
params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level,
90, config->mbus.compress.limit));
- params.setSkipRequestThread(config->mbus.skipRequestThread);
- params.setSkipReplyThread(config->mbus.skipReplyThread);
// Configure messagebus here as we for legacy reasons have
// config here.
@@ -472,18 +468,6 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
}
}
-void
-CommunicationManager::enqueue_or_process(std::shared_ptr<api::StorageMessage> msg)
-{
- assert(msg);
- if (_skip_thread.load(std::memory_order_relaxed)) {
- LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(msg);
- } else {
- dispatch_async(std::move(msg));
- }
-}
-
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
process(std::move(msg));
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 31c6fa00f0e..d52fb56cf20 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -121,10 +121,8 @@ private:
std::atomic<bool> _closed;
DocumentApiConverter _docApiConverter;
framework::Thread::UP _thread;
- std::atomic<bool> _skip_thread;
void updateMetrics(const MetricLockGuard &) override;
- void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg);
// Test needs access to configure() for live reconfig testing.
friend struct CommunicationManagerTest;