aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-30 18:19:52 +0200
committerGitHub <noreply@github.com>2017-09-30 18:19:52 +0200
commit19ac47a78d13646a4142688cb554f08b4a3400c3 (patch)
treeacc99da8ba90b16a657ba820607c10de95dc04dd /messagebus
parent8a02ebf681baddf7575818507ad5b701c862810b (diff)
Revert "Revert "Balder/let protocol signal sequencing requirements""
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/protocolrepository/protocolrepository.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/iprotocol.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h3
-rw-r--r--messagebus/src/vespa/messagebus/testlib/simpleprotocol.h1
5 files changed, 19 insertions, 6 deletions
diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
index b2454eb272a..b6178449918 100644
--- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp
+++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
@@ -48,6 +48,7 @@ public:
(void)data;
throw std::exception();
}
+ bool requireSequencing() const override { return false; }
};
int
diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h
index 40cfc779c36..8a4129d1976 100644
--- a/messagebus/src/vespa/messagebus/iprotocol.h
+++ b/messagebus/src/vespa/messagebus/iprotocol.h
@@ -52,8 +52,7 @@ public:
* @param param Ppolicy specific parameter.
* @return A newly created routing policy.
*/
- virtual IRoutingPolicy::UP createPolicy(const string &name,
- const string &param) const = 0;
+ virtual IRoutingPolicy::UP createPolicy(const string &name, const string &param) const = 0;
/**
* Encodes the protocol specific data of a routable into a byte array.
@@ -80,6 +79,9 @@ public:
* @return The decoded routable.
*/
virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw()
+
+
+ virtual bool requireSequencing() const = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index cc6b7086756..1ee791d5454 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -220,11 +220,19 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers
void
RPCSend::handleReply(Reply::UP reply)
{
- doHandleReply(std::move(reply));
+ const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
+ if (protocol->requireSequencing()) {
+ doHandleReply(protocol, std::move(reply));
+ } else {
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
+ doHandleReply(protocol, std::move(reply));
+ }));
+ assert (!rejected);
+ }
}
void
-RPCSend::doHandleReply(Reply::UP reply) {
+RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) {
ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
FRT_RPCRequest &req = ctx->getRequest();
string version = ctx->getVersion().toString();
@@ -234,7 +242,7 @@ RPCSend::doHandleReply(Reply::UP reply) {
}
Blob payload(0);
if (reply->getType() != 0) {
- payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
+ payload = protocol->encode(ctx->getVersion(), *reply);
if (payload.size() == 0) {
reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index ec455aea7bd..11a042b91c0 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -19,6 +19,7 @@ class Error;
class Route;
class Message;
class RPCServiceAddress;
+class IProtocol;
class PayLoadFiller
{
@@ -84,7 +85,7 @@ public:
private:
void doRequest(FRT_RPCRequest *req);
void doRequestDone(FRT_RPCRequest *req);
- void doHandleReply(std::unique_ptr<Reply> reply);
+ void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply);
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
index 8931e1b46f9..09e1ee9febe 100644
--- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
+++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
@@ -72,6 +72,7 @@ public:
IRoutingPolicy::UP createPolicy(const string &name, const string &param) const override;
Blob encode(const vespalib::Version &version, const Routable &routable) const override;
Routable::UP decode(const vespalib::Version &version, BlobRef data) const override;
+ virtual bool requireSequencing() const override { return false; }
};
} // namespace mbus