summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArnstein Ressem <aressem@gmail.com>2017-09-30 17:29:01 +0200
committerGitHub <noreply@github.com>2017-09-30 17:29:01 +0200
commitc6255ad6f350853f0f2a454a3395704189cd4536 (patch)
tree7e083745a8e04c6621073fede79899552165dbf8
parent8d54a7cf27c6bbf754fed67be94dce4c391b2256 (diff)
parentebafb58f0f7c79912753665e228f61f29b999185 (diff)
Merge pull request #3600 from vespa-engine/revert-3594-balder/let-protocol-signal-sequencing-requirements
Revert "Balder/let protocol signal sequencing requirements"
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.h7
-rw-r--r--messagebus/src/tests/protocolrepository/protocolrepository.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/iprotocol.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h3
-rw-r--r--messagebus/src/vespa/messagebus/testlib/simpleprotocol.h1
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h4
7 files changed, 12 insertions, 24 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
index c3417d85197..b2d1456fd98 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
@@ -24,7 +24,7 @@ class SystemState;
class IRoutingPolicyFactory;
class IRoutableFactory;
-class DocumentProtocol final : public mbus::IProtocol {
+class DocumentProtocol : public mbus::IProtocol {
private:
std::unique_ptr<RoutingPolicyRepository> _routingPolicyRepository;
std::unique_ptr<RoutableRepository> _routableRepository;
@@ -264,7 +264,8 @@ public:
* @param buf A byte buffer that contains a serialized routable.
* @return The deserialized routable.
*/
- mbus::Routable::UP deserialize(uint32_t type, document::ByteBuffer &buf) const;
+ mbus::Routable::UP deserialize(uint32_t type,
+ document::ByteBuffer &buf) const;
/**
* This is a convenient entry to the {@link #merge(RoutingContext,std::set)} method by way of a routing
@@ -306,7 +307,7 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string &param) const override;
mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override;
mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override;
- bool requireSequencing() const override { return false; }
};
}
+
diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
index b6178449918..b2454eb272a 100644
--- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp
+++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
@@ -48,7 +48,6 @@ public:
(void)data;
throw std::exception();
}
- bool requireSequencing() const override { return false; }
};
int
diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h
index 8a4129d1976..40cfc779c36 100644
--- a/messagebus/src/vespa/messagebus/iprotocol.h
+++ b/messagebus/src/vespa/messagebus/iprotocol.h
@@ -52,7 +52,8 @@ public:
* @param param Ppolicy specific parameter.
* @return A newly created routing policy.
*/
- virtual IRoutingPolicy::UP createPolicy(const string &name, const string &param) const = 0;
+ virtual IRoutingPolicy::UP createPolicy(const string &name,
+ const string &param) const = 0;
/**
* Encodes the protocol specific data of a routable into a byte array.
@@ -79,9 +80,6 @@ public:
* @return The decoded routable.
*/
virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw()
-
-
- virtual bool requireSequencing() const = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 1ee791d5454..cc6b7086756 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -220,19 +220,11 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers
void
RPCSend::handleReply(Reply::UP reply)
{
- const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (protocol->requireSequencing()) {
- doHandleReply(protocol, std::move(reply));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
- doHandleReply(protocol, std::move(reply));
- }));
- assert (!rejected);
- }
+ doHandleReply(std::move(reply));
}
void
-RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) {
+RPCSend::doHandleReply(Reply::UP reply) {
ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
FRT_RPCRequest &req = ctx->getRequest();
string version = ctx->getVersion().toString();
@@ -242,7 +234,7 @@ RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) {
}
Blob payload(0);
if (reply->getType() != 0) {
- payload = protocol->encode(ctx->getVersion(), *reply);
+ payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
if (payload.size() == 0) {
reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index 11a042b91c0..ec455aea7bd 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -19,7 +19,6 @@ class Error;
class Route;
class Message;
class RPCServiceAddress;
-class IProtocol;
class PayLoadFiller
{
@@ -85,7 +84,7 @@ public:
private:
void doRequest(FRT_RPCRequest *req);
void doRequestDone(FRT_RPCRequest *req);
- void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply);
+ void doHandleReply(std::unique_ptr<Reply> reply);
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
index 09e1ee9febe..8931e1b46f9 100644
--- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
+++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
@@ -72,7 +72,6 @@ public:
IRoutingPolicy::UP createPolicy(const string &name, const string &param) const override;
Blob encode(const vespalib::Version &version, const Routable &routable) const override;
Routable::UP decode(const vespalib::Version &version, BlobRef data) const override;
- virtual bool requireSequencing() const override { return false; }
};
} // namespace mbus
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
index 10289adaf1a..699f1c4c239 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
@@ -6,7 +6,7 @@
namespace storage::mbusprot {
-class StorageProtocol final : public mbus::IProtocol
+class StorageProtocol : public mbus::IProtocol
{
public:
typedef std::shared_ptr<StorageProtocol> SP;
@@ -20,7 +20,7 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string& name, const mbus::string& param) const override;
mbus::Blob encode(const vespalib::Version&, const mbus::Routable&) const override;
mbus::Routable::UP decode(const vespalib::Version&, mbus::BlobRef) const override;
- virtual bool requireSequencing() const override { return true; }
+
private:
ProtocolSerialization5_0 _serializer5_0;
ProtocolSerialization5_1 _serializer5_1;