aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-27 14:15:14 +0200
committerGitHub <noreply@github.com>2017-09-27 14:15:14 +0200
commit34dcac22b79b0e0bce4a3e4108fd2cb99555ddc1 (patch)
treec9987fda7cfd695a9f9e999027e0eae625d7a1a7 /messagebus
parent12694e5879ae0261d1d92e81f7cb900d31a4a051 (diff)
parentf9f97b5913af6f967eabf5b5fd6f1279acee263a (diff)
Merge pull request #3551 from vespa-engine/balder/also-send-in-thread-pool-2
Balder/also send in thread pool 2
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp11
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h1
3 files changed, 11 insertions, 5 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 47ae831ef6a..8ff7ac87edc 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -15,7 +15,7 @@
#include <vespa/slobrok/sbmirror.h>
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
@@ -376,13 +376,13 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
+ _executor->sync();
task.await();
}
void
RPCNetwork::shutdown()
{
- _executor->sync();
_transport->ShutDown(false);
_threadPool->Close();
_executor->shutdown();
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 5d5611a45fa..cc6b7086756 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -15,6 +15,7 @@
#include <vespa/vespalib/data/slime/cursor.h>
using vespalib::make_string;
+using vespalib::makeLambdaTask;
namespace mbus {
@@ -147,7 +148,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
void
RPCSend::RequestDone(FRT_RPCRequest *req)
{
- _net->getExecutor().execute(vespalib::makeLambdaTask([this, req]() { doRequestDone(req);}));
+ doRequestDone(req);
}
void
@@ -219,6 +220,11 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers
void
RPCSend::handleReply(Reply::UP reply)
{
+ doHandleReply(std::move(reply));
+}
+
+void
+RPCSend::doHandleReply(Reply::UP reply) {
ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
FRT_RPCRequest &req = ctx->getRequest();
string version = ctx->getVersion().toString();
@@ -242,8 +248,7 @@ void
RPCSend::invoke(FRT_RPCRequest *req)
{
req->Detach();
- auto rejected = _net->getExecutor().execute(vespalib::makeLambdaTask([this, req]() { doRequest(req);}));
- assert(!rejected);
+ doRequest(req);
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index d1d720803ec..ec455aea7bd 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -84,6 +84,7 @@ public:
private:
void doRequest(FRT_RPCRequest *req);
void doRequestDone(FRT_RPCRequest *req);
+ void doHandleReply(std::unique_ptr<Reply> reply);
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,