aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-28 00:15:37 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-02 14:40:36 +0000
commit42d5ce33431512375f946ef91d45a451d3cc8a6b (patch)
tree2eb6d43081b2b88faacc1309829718673c1b1d77
parent47019081334e7a779eda4d241546e663ed8f87bc (diff)
Revert "Revert "Balder/rearrange threads""
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp40
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h7
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h33
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp25
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
9 files changed, 91 insertions, 46 deletions
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 5313c4adcbb..36211d8ec38 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -246,13 +246,13 @@ Messenger::start()
void
Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler)
{
- enqueue(std::make_unique<MessageTask>(std::move(msg), handler));
+ handler.handleMessage(std::move(msg));
}
void
Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
{
- enqueue(std::make_unique<ReplyTask>(std::move(reply), handler));
+ handler.handleReply(std::move(reply));
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index de3be2ffa01..faa67b9bece 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -17,6 +17,8 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -78,6 +80,17 @@ struct TargetPoolTask : public FNET_Task {
}
};
+std::unique_ptr<vespalib::SyncableThreadExecutor>
+createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
+ switch (optimizeFor) {
+ case RPCNetworkParams::OptimizeFor::LATENCY:
+ return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
+ case RPCNetworkParams::OptimizeFor::THROUGHPUT:
+ default:
+ return std::make_unique<vespalib::SingleExecutor>(100);
+ }
+}
+
}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
@@ -107,8 +120,16 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
if (shouldSend) {
- _net.send(*this);
- delete this;
+ if (_net.allowDispatchForEncode()) {
+ auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() {
+ _net.send(*this);
+ delete this;
+ }));
+ assert (!rejected);
+ } else {
+ _net.send(*this);
+ delete this;
+ }
}
}
@@ -116,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
@@ -126,7 +147,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
_targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
+ _singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
+ _singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -136,7 +158,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
{
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
_transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
- _transport->SetTCPNoDelay(params.getTcpNoDelay());
}
RPCNetwork::~RPCNetwork()
@@ -406,7 +427,8 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
- _executor->sync();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
task.await();
}
@@ -415,8 +437,10 @@ RPCNetwork::shutdown()
{
_transport->ShutDown(true);
_threadPool->Close();
- _executor->shutdown();
- _executor->sync();
+ _singleEncodeExecutor->shutdown();
+ _singleDecodeExecutor->shutdown();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a8eb514387c..a510aae9014 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -65,7 +65,8 @@ private:
std::unique_ptr<RPCTargetPool> _targetPool;
std::unique_ptr<FNET_Task> _targetPoolTask;
std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
std::unique_ptr<RPCSendAdapter> _sendV1;
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
@@ -73,7 +74,6 @@ private:
bool _allowDispatchForEncode;
bool _allowDispatchForDecode;
-
/**
* Resolves and assigns a service address for the given recipient using the
* given address. This is called by the {@link
@@ -224,7 +224,8 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor() const { return *_executor; }
+ vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; }
+ vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 5bf277a8ee6..482a46b2564 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _numThreads(4),
- _tcpNoDelay(true),
+ _numThreads(1),
+ _optimizeFor(OptimizeFor::LATENCY),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
_connectionExpireSecs(600),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 140f81c611c..a4b752f46d4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -12,21 +12,10 @@ namespace mbus {
* held by this class. This class has reasonable default values for each parameter.
*/
class RPCNetworkParams {
-private:
+public:
+ enum class OptimizeFor { LATENCY, THROUGHPUT};
using CompressionConfig = vespalib::compression::CompressionConfig;
- Identity _identity;
- config::ConfigUri _slobrokConfig;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- uint32_t _numThreads;
- bool _tcpNoDelay;
- bool _dispatchOnEncode;
- bool _dispatchOnDecode;
- double _connectionExpireSecs;
- CompressionConfig _compressionConfig;
-public:
RPCNetworkParams();
RPCNetworkParams(config::ConfigUri configUri);
~RPCNetworkParams();
@@ -107,12 +96,12 @@ public:
uint32_t getNumThreads() const { return _numThreads; }
- RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) {
- _tcpNoDelay = tcpNoDelay;
+ RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) {
+ _optimizeFor = tcpNoDelay;
return *this;
}
- bool getTcpNoDelay() const { return _tcpNoDelay; }
+ OptimizeFor getOptimizeFor() const { return _optimizeFor; }
/**
* Returns the number of seconds before an idle network connection expires.
@@ -198,6 +187,18 @@ public:
}
uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
+private:
+ Identity _identity;
+ config::ConfigUri _slobrokConfig;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ uint32_t _numThreads;
+ OptimizeFor _optimizeFor;
+ bool _dispatchOnEncode;
+ bool _dispatchOnDecode;
+ double _connectionExpireSecs;
+ CompressionConfig _compressionConfig;
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 2422638dc05..d217c7964d6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
void
RPCSend::RequestDone(FRT_RPCRequest *req)
{
- doRequestDone(req);
+ if ( _net->allowDispatchForDecode()) {
+ auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() {
+ doRequestDone(req);
+ }));
+ assert (!rejected);
+ } else {
+ doRequestDone(req);
+ }
}
void
@@ -221,13 +228,13 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
- doHandleReply(protocol, std::move(reply));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
+ if (protocol && _net->allowDispatchForEncode()) {
+ auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
doHandleReply(protocol, std::move(reply));
}));
assert (!rejected);
+ } else {
+ doHandleReply(protocol, std::move(reply));
}
}
@@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req)
vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
return;
}
- if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
- doRequest(req, protocol, std::move(params));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
+ if (_net->allowDispatchForDecode()) {
+ auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
doRequest(req, protocol, std::move(params));
}));
assert (!rejected);
+ } else {
+ doRequest(req, protocol, std::move(params));
}
}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 8f5b22aa7fa..33b3ec37c04 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -29,11 +29,11 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
## TTL for rpc target cache
mbus.rpctargetcache.ttl double default = 600
-## Number of threads for mbus threadpool
+## Number of threads for network.
## Any value below 1 will be 1.
-mbus.num_threads int default=4
+mbus.num_threads int default=1
-mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY
+mbus.optimize_for enum {LATENCY, THROUGHPUT} default = THROUGHPUT
## Enable to use above thread pool for encoding replies
## False will use network(fnet) thread
@@ -42,4 +42,4 @@ mbus.dispatch_on_encode bool default=true
## Enable to use above thread pool for decoding replies
## False will use network(fnet) thread
## Todo: Change default once verified in large scale deployment.
-mbus.dispatch_on_decode bool default=false
+mbus.dispatch_on_decode bool default=true
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index fa2b0cda018..e7d1f06bbd7 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -25,6 +25,7 @@ LOG_SETUP(".communication.manager");
using vespalib::make_string;
using document::FixedBucketSpaces;
+using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
namespace storage {
@@ -281,6 +282,17 @@ struct PlaceHolderBucketResolver : public BucketResolver {
}
};
+mbus::RPCNetworkParams::OptimizeFor
+convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) {
+ switch (optimizeFor) {
+ case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY:
+ return mbus::RPCNetworkParams::OptimizeFor::LATENCY;
+ case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT:
+ default:
+ return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT;
+ }
+}
+
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
@@ -415,7 +427,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
params.setNumThreads(std::max(1, config->mbus.numThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
- params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY);
+ params.setOptimizeFor(convert(config->mbus.optimizeFor));
params.setIdentity(mbus::Identity(_component.getIdentity()));
if (config->mbusport != -1) {
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index c08ad214768..a0ae4bf3b43 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -116,7 +116,7 @@ private:
void process(const std::shared_ptr<api::StorageMessage>& msg);
- using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig;
+ using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);