summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-05 18:54:21 +0200
committerGitHub <noreply@github.com>2020-04-05 18:54:21 +0200
commit2e3c2c7828943a1fc4a70059d071c076db024a5f (patch)
tree29a8f9c61b71b75dae275d994772b6a83ac8d926
parent32249cc80d14752011bb1c242bbdecd6ba81ad67 (diff)
Revert "Revert "Revert "Balder/rearrange threads"""
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp40
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h7
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h33
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp25
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
9 files changed, 46 insertions, 91 deletions
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 36211d8ec38..5313c4adcbb 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -246,13 +246,13 @@ Messenger::start()
void
Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler)
{
- handler.handleMessage(std::move(msg));
+ enqueue(std::make_unique<MessageTask>(std::move(msg), handler));
}
void
Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
{
- handler.handleReply(std::move(reply));
+ enqueue(std::make_unique<ReplyTask>(std::move(reply), handler));
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index faa67b9bece..de3be2ffa01 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -17,8 +17,6 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
-#include <vespa/vespalib/util/singleexecutor.h>
-#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -80,17 +78,6 @@ struct TargetPoolTask : public FNET_Task {
}
};
-std::unique_ptr<vespalib::SyncableThreadExecutor>
-createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
- switch (optimizeFor) {
- case RPCNetworkParams::OptimizeFor::LATENCY:
- return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
- case RPCNetworkParams::OptimizeFor::THROUGHPUT:
- default:
- return std::make_unique<vespalib::SingleExecutor>(100);
- }
-}
-
}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
@@ -120,16 +107,8 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
if (shouldSend) {
- if (_net.allowDispatchForEncode()) {
- auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() {
- _net.send(*this);
- delete this;
- }));
- assert (!rejected);
- } else {
- _net.send(*this);
- delete this;
- }
+ _net.send(*this);
+ delete this;
}
}
@@ -137,7 +116,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
+ _transport(std::make_unique<FNET_Transport>()),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
@@ -147,8 +126,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
_targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
- _singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
- _singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -158,6 +136,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
{
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
_transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
+ _transport->SetTCPNoDelay(params.getTcpNoDelay());
}
RPCNetwork::~RPCNetwork()
@@ -427,8 +406,7 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
- _singleEncodeExecutor->sync();
- _singleDecodeExecutor->sync();
+ _executor->sync();
task.await();
}
@@ -437,10 +415,8 @@ RPCNetwork::shutdown()
{
_transport->ShutDown(true);
_threadPool->Close();
- _singleEncodeExecutor->shutdown();
- _singleDecodeExecutor->shutdown();
- _singleEncodeExecutor->sync();
- _singleDecodeExecutor->sync();
+ _executor->shutdown();
+ _executor->sync();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a510aae9014..a8eb514387c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -65,8 +65,7 @@ private:
std::unique_ptr<RPCTargetPool> _targetPool;
std::unique_ptr<FNET_Task> _targetPoolTask;
std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
+ std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
std::unique_ptr<RPCSendAdapter> _sendV1;
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
@@ -74,6 +73,7 @@ private:
bool _allowDispatchForEncode;
bool _allowDispatchForDecode;
+
/**
* Resolves and assigns a service address for the given recipient using the
* given address. This is called by the {@link
@@ -224,8 +224,7 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; }
- vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; }
+ vespalib::Executor & getExecutor() const { return *_executor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 482a46b2564..5bf277a8ee6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _numThreads(1),
- _optimizeFor(OptimizeFor::LATENCY),
+ _numThreads(4),
+ _tcpNoDelay(true),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
_connectionExpireSecs(600),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index a4b752f46d4..140f81c611c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -12,10 +12,21 @@ namespace mbus {
* held by this class. This class has reasonable default values for each parameter.
*/
class RPCNetworkParams {
-public:
- enum class OptimizeFor { LATENCY, THROUGHPUT};
+private:
using CompressionConfig = vespalib::compression::CompressionConfig;
+ Identity _identity;
+ config::ConfigUri _slobrokConfig;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ uint32_t _numThreads;
+ bool _tcpNoDelay;
+ bool _dispatchOnEncode;
+ bool _dispatchOnDecode;
+ double _connectionExpireSecs;
+ CompressionConfig _compressionConfig;
+public:
RPCNetworkParams();
RPCNetworkParams(config::ConfigUri configUri);
~RPCNetworkParams();
@@ -96,12 +107,12 @@ public:
uint32_t getNumThreads() const { return _numThreads; }
- RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) {
- _optimizeFor = tcpNoDelay;
+ RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) {
+ _tcpNoDelay = tcpNoDelay;
return *this;
}
- OptimizeFor getOptimizeFor() const { return _optimizeFor; }
+ bool getTcpNoDelay() const { return _tcpNoDelay; }
/**
* Returns the number of seconds before an idle network connection expires.
@@ -187,18 +198,6 @@ public:
}
uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
-private:
- Identity _identity;
- config::ConfigUri _slobrokConfig;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- uint32_t _numThreads;
- OptimizeFor _optimizeFor;
- bool _dispatchOnEncode;
- bool _dispatchOnDecode;
- double _connectionExpireSecs;
- CompressionConfig _compressionConfig;
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index d217c7964d6..2422638dc05 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -148,14 +148,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
void
RPCSend::RequestDone(FRT_RPCRequest *req)
{
- if ( _net->allowDispatchForDecode()) {
- auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() {
- doRequestDone(req);
- }));
- assert (!rejected);
- } else {
- doRequestDone(req);
- }
+ doRequestDone(req);
}
void
@@ -228,13 +221,13 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (protocol && _net->allowDispatchForEncode()) {
- auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
+ if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
+ doHandleReply(protocol, std::move(reply));
+ } else {
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
doHandleReply(protocol, std::move(reply));
}));
assert (!rejected);
- } else {
- doHandleReply(protocol, std::move(reply));
}
}
@@ -273,13 +266,13 @@ RPCSend::invoke(FRT_RPCRequest *req)
vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
return;
}
- if (_net->allowDispatchForDecode()) {
- auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
+ if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
+ doRequest(req, protocol, std::move(params));
+ } else {
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
doRequest(req, protocol, std::move(params));
}));
assert (!rejected);
- } else {
- doRequest(req, protocol, std::move(params));
}
}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 33b3ec37c04..8f5b22aa7fa 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -29,11 +29,11 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
## TTL for rpc target cache
mbus.rpctargetcache.ttl double default = 600
-## Number of threads for network.
+## Number of threads for mbus threadpool
## Any value below 1 will be 1.
-mbus.num_threads int default=1
+mbus.num_threads int default=4
-mbus.optimize_for enum {LATENCY, THROUGHPUT} default = THROUGHPUT
+mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY
## Enable to use above thread pool for encoding replies
## False will use network(fnet) thread
@@ -42,4 +42,4 @@ mbus.dispatch_on_encode bool default=true
## Enable to use above thread pool for decoding replies
## False will use network(fnet) thread
## Todo: Change default once verified in large scale deployment.
-mbus.dispatch_on_decode bool default=true
+mbus.dispatch_on_decode bool default=false
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index e7d1f06bbd7..fa2b0cda018 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -25,7 +25,6 @@ LOG_SETUP(".communication.manager");
using vespalib::make_string;
using document::FixedBucketSpaces;
-using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
namespace storage {
@@ -282,17 +281,6 @@ struct PlaceHolderBucketResolver : public BucketResolver {
}
};
-mbus::RPCNetworkParams::OptimizeFor
-convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) {
- switch (optimizeFor) {
- case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY:
- return mbus::RPCNetworkParams::OptimizeFor::LATENCY;
- case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT:
- default:
- return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT;
- }
-}
-
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
@@ -427,7 +415,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
params.setNumThreads(std::max(1, config->mbus.numThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
- params.setOptimizeFor(convert(config->mbus.optimizeFor));
+ params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY);
params.setIdentity(mbus::Identity(_component.getIdentity()));
if (config->mbusport != -1) {
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index a0ae4bf3b43..c08ad214768 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -116,7 +116,7 @@ private:
void process(const std::shared_ptr<api::StorageMessage>& msg);
- using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
+ using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig;
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);