From 742fe9f527d4d6063b15c4a5a3b0baec14057830 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 26 Sep 2017 16:33:11 +0200 Subject: Do the sending in a thread pool too. --- messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 47ae831ef6a..1ea5654a632 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -86,8 +86,10 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - _net.send(*this); - delete this; + _net.getExecutor().execute(vespalib::makeLambdaTask([this]() { + _net.send(*this); + delete this; + })); } } -- cgit v1.2.3 From a20954e6345e87e318ee8c9a646841ada7c61718 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 26 Sep 2017 17:03:42 +0200 Subject: Also reply in the thread pool. --- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 10 +++++++++- messagebus/src/vespa/messagebus/network/rpcsend.h | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 5d5611a45fa..931c87a7d91 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -15,6 +15,7 @@ #include using vespalib::make_string; +using vespalib::makeLambdaTask; namespace mbus { @@ -147,7 +148,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - _net->getExecutor().execute(vespalib::makeLambdaTask([this, req]() { doRequestDone(req);})); + _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequestDone(req);})); } void @@ -219,6 +220,13 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { + _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { + doHandleReply(std::move(reply)); + })); +} + +void +RPCSend::doHandleReply(Reply::UP reply) { ReplyContext::UP ctx(static_cast(reply->getContext().value.PTR)); FRT_RPCRequest &req = ctx->getRequest(); string version = ctx->getVersion().toString(); diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index d1d720803ec..ec455aea7bd 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -84,6 +84,7 @@ public: private: void doRequest(FRT_RPCRequest *req); void doRequestDone(FRT_RPCRequest *req); + void doHandleReply(std::unique_ptr reply); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, -- cgit v1.2.3 From f230803c47febb2fcf62775d1d4ce0464fd1b74a Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 26 Sep 2017 17:03:42 +0200 Subject: Also reply in the thread pool. --- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 931c87a7d91..881796ca200 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -250,7 +250,7 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - auto rejected = _net->getExecutor().execute(vespalib::makeLambdaTask([this, req]() { doRequest(req);})); + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequest(req);})); assert(!rejected); } -- cgit v1.2.3 From fe5d41b5f89da44e7a4f8bf2e11fe1b1c2e4416f Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 27 Sep 2017 11:33:37 +0200 Subject: Assert that task is accepted. --- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 881796ca200..53f1bb8a70a 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,7 +148,8 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequestDone(req);})); + auto task = _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequestDone(req);})); + assert(!task); } void -- cgit v1.2.3 From e2856b653042a28f2978fa32dab2fa84a455a384 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 27 Sep 2017 11:45:34 +0200 Subject: Move the redundant sync from shutdown to where it is required in sync. --- messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 1ea5654a632..a454f8382c0 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -378,13 +378,13 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); + _executor->sync(); task.await(); } void RPCNetwork::shutdown() { - _executor->sync(); _transport->ShutDown(false); _threadPool->Close(); _executor->shutdown(); -- cgit v1.2.3 From e5b560719a4d6e364cbcb52e200501786e907133 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 27 Sep 2017 12:17:39 +0200 Subject: Assert that we are not rejected. If we are there are a logic flaw somewhere.... --- messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 3 ++- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index a454f8382c0..b451a538161 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -86,10 +86,11 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - _net.getExecutor().execute(vespalib::makeLambdaTask([this]() { + auto rejected = _net.getExecutor().execute(vespalib::makeLambdaTask([this]() { _net.send(*this); delete this; })); + assert (!rejected); } } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 53f1bb8a70a..bc1ac75e778 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,8 +148,8 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - auto task = _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequestDone(req);})); - assert(!task); + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequestDone(req);})); + assert(!rejected); } void @@ -221,9 +221,10 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { doHandleReply(std::move(reply)); })); + assert (!rejected); } void -- cgit v1.2.3 From f9f97b5913af6f967eabf5b5fd6f1279acee263a Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 27 Sep 2017 14:08:30 +0200 Subject: Disable multiple threads until we have figured out ordering requirements from the distributor/content layer. --- messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 7 ++----- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 11 +++-------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index b451a538161..8ff7ac87edc 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -86,11 +86,8 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - auto rejected = _net.getExecutor().execute(vespalib::makeLambdaTask([this]() { - _net.send(*this); - delete this; - })); - assert (!rejected); + _net.send(*this); + delete this; } } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index bc1ac75e778..cc6b7086756 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,8 +148,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequestDone(req);})); - assert(!rejected); + doRequestDone(req); } void @@ -221,10 +220,7 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { - doHandleReply(std::move(reply)); - })); - assert (!rejected); + doHandleReply(std::move(reply)); } void @@ -252,8 +248,7 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { doRequest(req);})); - assert(!rejected); + doRequest(req); } void -- cgit v1.2.3