diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-07-09 07:45:01 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-07-09 07:45:01 +0000 |
commit | 637de9a2313414da6930f703e2a63eae4637d0e6 (patch) | |
tree | e5b16eb1f9bc2bc3f138ad5e8148d3e00613a552 | |
parent | 3af0dfbc0f011a70259424d446a8a9f1bd9b994b (diff) |
Config control over what treads to skip.
12 files changed, 98 insertions, 48 deletions
diff --git a/messagebus/src/tests/messenger/messenger.cpp b/messagebus/src/tests/messenger/messenger.cpp index 8133d24c186..0653ee52b0e 100644 --- a/messagebus/src/tests/messenger/messenger.cpp +++ b/messagebus/src/tests/messenger/messenger.cpp @@ -6,8 +6,6 @@ using namespace mbus; -TEST_SETUP(Test); - class ThrowException : public Messenger::ITask { public: void run() override { @@ -39,20 +37,17 @@ public: } }; -int -Test::Main() -{ - TEST_INIT("messenger_test"); +TEST("messenger_test") { - Messenger msn; + Messenger msn(true, true); msn.start(); vespalib::Barrier barrier(2); - msn.enqueue(Messenger::ITask::UP(new ThrowException())); - msn.enqueue(Messenger::ITask::UP(new BarrierTask(barrier))); + msn.enqueue(std::make_unique<ThrowException>()); + msn.enqueue(std::make_unique<BarrierTask>(barrier)); barrier.await(); ASSERT_TRUE(msn.isEmpty()); - - TEST_DONE(); } + +TEST_MAIN() { TEST_RUN_ALL(); }
\ No newline at end of file diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index fee9504007b..c3d6b28b318 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -83,13 +83,13 @@ public: namespace mbus { -MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) : +MessageBus::MessageBus(INetwork &net, ProtocolSet protocols, bool skip_request_thread, bool skip_reply_thread) : _network(net), _lock(), _routingTables(), _sessions(), _protocolRepository(std::make_unique<ProtocolRepository>()), - _msn(std::make_unique<Messenger>()), + _msn(std::make_unique<Messenger>(skip_request_thread, skip_reply_thread)), _resender(), _maxPendingCount(0), _maxPendingSize(0), @@ -112,7 +112,7 @@ MessageBus::MessageBus(INetwork &net, const MessageBusParams ¶ms) : _routingTables(), _sessions(), _protocolRepository(std::make_unique<ProtocolRepository>()), - _msn(std::make_unique<Messenger>()), + _msn(std::make_unique<Messenger>(true, true)), _resender(), _maxPendingCount(params.getMaxPendingCount()), _maxPendingSize(params.getMaxPendingSize()), diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h index f10898275b3..c0682967db9 100644 --- a/messagebus/src/vespa/messagebus/messagebus.h +++ b/messagebus/src/vespa/messagebus/messagebus.h @@ -83,7 +83,7 @@ public: * @param network The network to associate with. * @param protocols An array of protocols to register. */ - MessageBus(INetwork &net, ProtocolSet protocols); + MessageBus(INetwork &net, ProtocolSet protocols, bool skip_request_thread, bool skip_reply_thread); /** * Constructs an instance of message bus. This requires a network object that it will associate with. This diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 5313c4adcbb..4579f7dec0e 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -156,15 +156,15 @@ public: namespace mbus { -Messenger::Messenger() : +Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread) : _monitor(), _pool(128000), _children(), _queue(), - _closed(false) -{ - // empty -} + _closed(false), + _skip_request_thread(skip_request_thread), + _skip_reply_thread(skip_reply_thread) +{} Messenger::~Messenger() { @@ -246,13 +246,21 @@ Messenger::start() void Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler) { - enqueue(std::make_unique<MessageTask>(std::move(msg), handler)); + if (_skip_request_thread) { + handler.handleMessage(std::move(msg)); + } else { + enqueue(std::make_unique<MessageTask>(std::move(msg), handler)); + } } void Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) { - enqueue(std::make_unique<ReplyTask>(std::move(reply), handler)); + if (_skip_reply_thread) { + handler.handleReply(std::move(reply)); + } else { + enqueue(std::make_unique<ReplyTask>(std::move(reply), handler)); + } } void diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h index 62e5a14cf95..3103e9afae1 100644 --- a/messagebus/src/vespa/messagebus/messenger.h +++ b/messagebus/src/vespa/messagebus/messenger.h @@ -45,20 +45,19 @@ private: std::vector<ITask*> _children; vespalib::ArrayQueue<ITask*> _queue; bool _closed; + const bool _skip_request_thread; + const bool _skip_reply_thread; protected: void Run(FastOS_ThreadInterface *thread, void *arg) override; public: - /** - * Constructs a new messenger object. - */ - Messenger(); + Messenger(bool skip_request_thread, bool skip_reply_thread); /** * Frees any allocated resources. Also destroys all queued tasks. */ - ~Messenger(); + ~Messenger() override; /** * Adds a recurrent task to this that is to be run for every iteration of diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5bf277a8ee6..01d1cd918a7 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -18,6 +18,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _tcpNoDelay(true), _dispatchOnEncode(true), _dispatchOnDecode(false), + _skip_request_thread(false), + _skip_reply_thread(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 140f81c611c..ddb4df1a3a3 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -23,6 +23,8 @@ private: bool _tcpNoDelay; bool _dispatchOnEncode; bool _dispatchOnDecode; + bool _skip_request_thread; + bool _skip_reply_thread; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -71,10 +73,6 @@ public: } /** - * - */ - - /** * Returns the port to listen to. * * @return The port. @@ -190,14 +188,28 @@ public: return *this; } - uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; } + bool getDispatchOnDecode() const { return _dispatchOnDecode; } RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) { _dispatchOnEncode = dispatchOnEncode; return *this; } - uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } + bool getDispatchOnEncode() const { return _dispatchOnEncode; } + + RPCNetworkParams &setSkipRequestThread(bool skip_request_thread) { + _skip_request_thread = skip_request_thread; + return *this; + } + + bool getSkipRequestThread() const { return _skip_request_thread; } + + RPCNetworkParams &setSkipReplyThread(bool skip_reply_thread) { + _skip_reply_thread = skip_reply_thread; + return *this; + } + + bool getSkipReplyThread() const { return _skip_reply_thread; } }; } diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp index 103c21ee3aa..29b73a4e730 100644 --- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp +++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpcmessagebus.h" +#include <vespa/messagebus/network/rpcnetworkparams.h> #include <vespa/config/subscription/configuri.h> namespace mbus { @@ -24,7 +25,7 @@ RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols, const RPCNetworkParams &rpcParams, const config::ConfigUri &routingCfgUri) : _net(rpcParams), - _bus(_net, protocols), + _bus(_net, protocols, rpcParams.getSkipRequestThread(), rpcParams.getSkipReplyThread()), _agent(_bus), _subscriber(routingCfgUri.getContext()) { diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp index 8e7f138b886..79d2539213d 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp +++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp @@ -25,7 +25,7 @@ TestServer::TestServer(const Identity &ident, const Slobrok &slobrok, IProtocol::SP protocol) : net(RPCNetworkParams(slobrok.config()).setIdentity(ident)), - mb(net, ProtocolSet().add(std::make_shared<SimpleProtocol>()).add(protocol)) + mb(net, ProtocolSet().add(std::make_shared<SimpleProtocol>()).add(protocol), true, true) { mb.setupRouting(spec); } diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index c855a4e683a..3e4b1fd6515 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -43,3 +43,15 @@ mbus.dispatch_on_encode bool default=true ## False will use network(fnet) thread ## Todo: Change default once verified in large scale deployment. mbus.dispatch_on_decode bool default=false + +## Skip messenger thread on reply +## Experimental +mbus.skip_reply_thread bool default=false + +## Skip messenger thread on reply +## Experimental +mbus.skip_request_thread bool default=false + +## Skip communication manager thread on mbus requests +## Experimental +skip_thread bool default=false diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index fda672e1ee4..5588b6535e2 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -14,16 +14,17 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/bufferedlogger.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/documentapi/messagebus/messages/getdocumentreply.h> + LOG_SETUP(".communication.manager"); using vespalib::make_string; using document::FixedBucketSpaces; -using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; namespace storage { @@ -44,8 +45,8 @@ StorageTransportContext::~StorageTransportContext() = default; void CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply) { - assert(reply.get()); - enqueue(reply); + assert(reply); + optionalEnqueue(reply); } namespace { @@ -100,7 +101,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) cmd->setTrace(docMsgPtr->getTrace()); cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr))); - enqueue(std::move(cmd)); + optionalEnqueue(std::move(cmd)); } else if (protocolName == mbusprot::StorageProtocol::NAME) { std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release())); @@ -112,7 +113,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) cmd->setTrace(storMsgPtr->getTrace()); cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr))); - enqueue(std::move(cmd)); + optionalEnqueue(std::move(cmd)); } else { LOGBM(warning, "Received unsupported message type %d for protocol '%s'", msg->getType(), msg->getProtocol().c_str()); @@ -253,7 +254,9 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _count(0), _configUri(configUri), _closed(false), - _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()) + _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), + _thread(), + _skip_thread(false) { _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); _component.registerMetric(_metrics); @@ -351,6 +354,7 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config) { // Only allow dynamic (live) reconfiguration of message bus limits. + _skip_thread = config->skipThread; if (_mbus) { configureMessageBusLimits(*config); if (_mbus->getRPCNetwork().getPort() != config->mbusport) { @@ -387,6 +391,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str()); params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level, 90, config->mbus.compress.limit)); + params.setSkipRequestThread(config->mbus.skipRequestThread); + params.setSkipReplyThread(config->mbus.skipReplyThread); + // Configure messagebus here as we for legacy reasons have // config here. _mbus = std::make_unique<mbus::RPCMessageBus>( @@ -437,11 +444,23 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } void +CommunicationManager::optionalEnqueue(std::shared_ptr<api::StorageMessage> msg) +{ + assert(msg); + if (_skip_thread) { + LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + process(msg); + } else { + enqueue(std::move(msg)); + } +} + +void CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg) { assert(msg); - LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(msg); + LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + _eventQueue.enqueue(std::move(msg)); } bool diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 1d64c8a8911..3fd82f3509d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -108,12 +108,14 @@ private: vespalib::Lock _messageBusSentLock; std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent; - config::ConfigUri _configUri; - std::atomic<bool> _closed; - DocumentApiConverter _docApiConverter; + config::ConfigUri _configUri; + std::atomic<bool> _closed; + DocumentApiConverter _docApiConverter; framework::Thread::UP _thread; + bool _skip_thread; void updateMetrics(const MetricLockGuard &) override; + void optionalEnqueue(std::shared_ptr<api::StorageMessage> msg); // Test needs access to configure() for live reconfig testing. friend struct CommunicationManagerTest; |