From ebafb58f0f7c79912753665e228f61f29b999185 Mon Sep 17 00:00:00 2001 From: Arnstein Ressem Date: Sat, 30 Sep 2017 17:28:37 +0200 Subject: Revert "Balder/let protocol signal sequencing requirements" --- .../src/vespa/documentapi/messagebus/documentprotocol.h | 7 ++++--- .../src/tests/protocolrepository/protocolrepository.cpp | 1 - messagebus/src/vespa/messagebus/iprotocol.h | 6 ++---- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 14 +++----------- messagebus/src/vespa/messagebus/network/rpcsend.h | 3 +-- messagebus/src/vespa/messagebus/testlib/simpleprotocol.h | 1 - storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h | 4 ++-- 7 files changed, 12 insertions(+), 24 deletions(-) diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h index c3417d85197..b2d1456fd98 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h @@ -24,7 +24,7 @@ class SystemState; class IRoutingPolicyFactory; class IRoutableFactory; -class DocumentProtocol final : public mbus::IProtocol { +class DocumentProtocol : public mbus::IProtocol { private: std::unique_ptr _routingPolicyRepository; std::unique_ptr _routableRepository; @@ -264,7 +264,8 @@ public: * @param buf A byte buffer that contains a serialized routable. * @return The deserialized routable. */ - mbus::Routable::UP deserialize(uint32_t type, document::ByteBuffer &buf) const; + mbus::Routable::UP deserialize(uint32_t type, + document::ByteBuffer &buf) const; /** * This is a convenient entry to the {@link #merge(RoutingContext,std::set)} method by way of a routing @@ -306,7 +307,7 @@ public: mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string ¶m) const override; mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override; mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override; - bool requireSequencing() const override { return false; } }; } + diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp index b6178449918..b2454eb272a 100644 --- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp +++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp @@ -48,7 +48,6 @@ public: (void)data; throw std::exception(); } - bool requireSequencing() const override { return false; } }; int diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h index 8a4129d1976..40cfc779c36 100644 --- a/messagebus/src/vespa/messagebus/iprotocol.h +++ b/messagebus/src/vespa/messagebus/iprotocol.h @@ -52,7 +52,8 @@ public: * @param param Ppolicy specific parameter. * @return A newly created routing policy. */ - virtual IRoutingPolicy::UP createPolicy(const string &name, const string ¶m) const = 0; + virtual IRoutingPolicy::UP createPolicy(const string &name, + const string ¶m) const = 0; /** * Encodes the protocol specific data of a routable into a byte array. @@ -79,9 +80,6 @@ public: * @return The decoded routable. */ virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw() - - - virtual bool requireSequencing() const = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 1ee791d5454..cc6b7086756 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -220,19 +220,11 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (protocol->requireSequencing()) { - doHandleReply(protocol, std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { - doHandleReply(protocol, std::move(reply)); - })); - assert (!rejected); - } + doHandleReply(std::move(reply)); } void -RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) { +RPCSend::doHandleReply(Reply::UP reply) { ReplyContext::UP ctx(static_cast(reply->getContext().value.PTR)); FRT_RPCRequest &req = ctx->getRequest(); string version = ctx->getVersion().toString(); @@ -242,7 +234,7 @@ RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) { } Blob payload(0); if (reply->getType() != 0) { - payload = protocol->encode(ctx->getVersion(), *reply); + payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply); if (payload.size() == 0) { reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log.")); } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index 11a042b91c0..ec455aea7bd 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -19,7 +19,6 @@ class Error; class Route; class Message; class RPCServiceAddress; -class IProtocol; class PayLoadFiller { @@ -85,7 +84,7 @@ public: private: void doRequest(FRT_RPCRequest *req); void doRequestDone(FRT_RPCRequest *req); - void doHandleReply(const IProtocol * protocol, std::unique_ptr reply); + void doHandleReply(std::unique_ptr reply); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h index 09e1ee9febe..8931e1b46f9 100644 --- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h +++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h @@ -72,7 +72,6 @@ public: IRoutingPolicy::UP createPolicy(const string &name, const string ¶m) const override; Blob encode(const vespalib::Version &version, const Routable &routable) const override; Routable::UP decode(const vespalib::Version &version, BlobRef data) const override; - virtual bool requireSequencing() const override { return false; } }; } // namespace mbus diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h index 10289adaf1a..699f1c4c239 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h +++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h @@ -6,7 +6,7 @@ namespace storage::mbusprot { -class StorageProtocol final : public mbus::IProtocol +class StorageProtocol : public mbus::IProtocol { public: typedef std::shared_ptr SP; @@ -20,7 +20,7 @@ public: mbus::IRoutingPolicy::UP createPolicy(const mbus::string& name, const mbus::string& param) const override; mbus::Blob encode(const vespalib::Version&, const mbus::Routable&) const override; mbus::Routable::UP decode(const vespalib::Version&, mbus::BlobRef) const override; - virtual bool requireSequencing() const override { return true; } + private: ProtocolSerialization5_0 _serializer5_0; ProtocolSerialization5_1 _serializer5_1; -- cgit v1.2.3