diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-19 14:06:57 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-20 10:16:58 +0000 |
commit | 86a245e6d0751dec7e38d64c4ea21ea7f4308239 (patch) | |
tree | 108cb5b7a96f96a0db35e95b13bf08e7dadad4d4 /messagebus | |
parent | 496e5cf57060fb068a9dc73ac7ff44333a60774e (diff) |
Use timeouts typed with unit.
Diffstat (limited to 'messagebus')
25 files changed, 101 insertions, 88 deletions
diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp index ebd8d6e250b..2f510d98ff1 100644 --- a/messagebus/src/tests/loadbalance/loadbalance.cpp +++ b/messagebus/src/tests/loadbalance/loadbalance.cpp @@ -16,6 +16,7 @@ #include <vespa/messagebus/testlib/testserver.h> using namespace mbus; +using namespace std::chrono_literals; struct Handler : public IMessageHandler { @@ -64,7 +65,7 @@ Test::Main() RoutableQueue queue; SourceSessionParams params; - params.setTimeout(30.0); + params.setTimeout(30s); params.setThrottlePolicy(IThrottlePolicy::SP()); SourceSession::UP ss = src.mb.createSourceSession(queue, params); diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp index d1b512bb1f1..5a4074f7f02 100644 --- a/messagebus/src/tests/messageordering/messageordering.cpp +++ b/messagebus/src/tests/messageordering/messageordering.cpp @@ -13,6 +13,7 @@ LOG_SETUP("messageordering_test"); using namespace mbus; +using namespace std::chrono_literals; TEST_SETUP(Test); @@ -151,10 +152,10 @@ Test::Main() SourceSessionParams ssp; ssp.setThrottlePolicy(IThrottlePolicy::SP()); - ssp.setTimeout(400); + ssp.setTimeout(400s); SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp); DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst); - ASSERT_EQUAL(400u, ssp.getTimeout()); + ASSERT_EQUAL(400.0, ssp.getTimeout().count()); // wait for slobrok registration ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session")); diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp index 18427a933cd..7c90a7f958f 100644 --- a/messagebus/src/tests/routable/routable.cpp +++ b/messagebus/src/tests/routable/routable.cpp @@ -11,6 +11,7 @@ #include <vespa/vespalib/testkit/testapp.h> using namespace mbus; +using namespace std::chrono_literals; TEST_SETUP(Test); @@ -26,14 +27,14 @@ Test::Main() foo.setRoute(fooRoute); foo.setRetry(1); foo.setTimeReceivedNow(); - foo.setTimeRemaining(2); + foo.setTimeRemaining(2ms); SimpleMessage bar("bar"); Route barRoute = Route::parse("bar"); bar.setRoute(barRoute); bar.setRetry(3); bar.setTimeReceivedNow(); - bar.setTimeRemaining(4); + bar.setTimeRemaining(4ms); foo.swapState(bar); EXPECT_EQUAL(barRoute.toString(), foo.getRoute().toString()); @@ -41,8 +42,8 @@ Test::Main() EXPECT_EQUAL(3u, foo.getRetry()); EXPECT_EQUAL(1u, bar.getRetry()); EXPECT_TRUE(foo.getTimeReceived() >= bar.getTimeReceived()); - EXPECT_EQUAL(4u, foo.getTimeRemaining()); - EXPECT_EQUAL(2u, bar.getTimeRemaining()); + EXPECT_EQUAL(4u, foo.getTimeRemaining().count()); + EXPECT_EQUAL(2u, bar.getTimeRemaining().count()); } { // Test reply swap state. @@ -73,7 +74,7 @@ Test::Main() msg.discard(); Reply::UP reply = handler.getReply(0); - ASSERT_TRUE(reply.get() == NULL); + ASSERT_FALSE(reply); } { // Test reply discard logic. @@ -86,7 +87,7 @@ Test::Main() reply.discard(); Reply::UP ap = handler.getReply(0); - ASSERT_TRUE(ap.get() == NULL); + ASSERT_FALSE(ap); } TEST_DONE(); diff --git a/messagebus/src/tests/timeout/timeout.cpp b/messagebus/src/tests/timeout/timeout.cpp index 03a2c61a052..b2631e13d9c 100644 --- a/messagebus/src/tests/timeout/timeout.cpp +++ b/messagebus/src/tests/timeout/timeout.cpp @@ -13,6 +13,8 @@ #include <vespa/messagebus/testlib/testserver.h> using namespace mbus; +using namespace std::chrono_literals; + class Test : public vespalib::TestApp { public: @@ -42,7 +44,7 @@ Test::testZeroTimeout() TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok); Receptor srcHandler; - SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0)); + SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0s)); Receptor dstHandler; DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler); @@ -50,7 +52,7 @@ Test::testZeroTimeout() ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted()); Reply::UP reply = srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode()); } @@ -63,19 +65,19 @@ Test::testMessageExpires() TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok); Receptor srcHandler, dstHandler; - SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1)); + SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1s)); DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler); ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1)); ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted()); Reply::UP reply = srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode()); Message::UP msg = dstHandler.getMessage(1); - if (msg.get() != NULL) { + if (msg) { msg->discard(); } } diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h index 0e171b9f6d0..c8de001bd97 100644 --- a/messagebus/src/vespa/messagebus/common.h +++ b/messagebus/src/vespa/messagebus/common.h @@ -2,11 +2,15 @@ #pragma once #include <vespa/vespalib/stllike/string.h> +#include <chrono> namespace mbus { // Decide the type of string used once -typedef vespalib::string string; +using string = vespalib::string; + +using seconds = std::chrono::duration<double>; + } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp index 94ef718c404..af1d97334f6 100644 --- a/messagebus/src/vespa/messagebus/message.cpp +++ b/messagebus/src/vespa/messagebus/message.cpp @@ -62,15 +62,15 @@ Message::setTimeReceivedNow() return *this; } -uint64_t +steady_clock::time_point Message::getTimeReceived() const { - return duration_cast<milliseconds>(_timeReceived.time_since_epoch()).count(); + return _timeReceived; } -uint64_t +milliseconds Message::getTimeRemainingNow() const { - return std::max(0L, _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived).count()); + return std::max(milliseconds(0), _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived)); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index 4d4c83b06bd..dce4d86450a 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -3,7 +3,6 @@ #include "routable.h" #include <vespa/messagebus/routing/route.h> -#include <memory> #include <chrono> namespace mbus { @@ -12,23 +11,11 @@ namespace mbus { * A Message is a question, a Reply is the answer. */ class Message : public Routable { -private: - using time_point = std::chrono::steady_clock::time_point; - Route _route; - time_point _timeReceived; - int64_t _timeRemaining; - bool _retryEnabled; - uint32_t _retry; - public: - /** - * Convenience typedef for an auto pointer to a Message object. - */ - typedef std::unique_ptr<Message> UP; + using time_point = std::chrono::steady_clock::time_point; + using milliseconds = std::chrono::milliseconds; + using UP = std::unique_ptr<Message>; - /** - * Constructs a new instance of this class. - */ Message(); Message(const Message &) = delete; Message(Message &&) = delete; @@ -51,7 +38,7 @@ public: * * @return The timestamp this was last seen. */ - uint64_t getTimeReceived() const; + time_point getTimeReceived() const; /** * This is a convenience method to call {@link #setTimeReceived(uint64_t)} @@ -69,7 +56,7 @@ public: * * @return The remaining time in milliseconds. */ - uint64_t getTimeRemaining() const { return _timeRemaining; } + milliseconds getTimeRemaining() const { return _timeRemaining; } /** * Sets the numer of milliseconds that remain before this message times @@ -79,7 +66,7 @@ public: * @param timeRemaining The number of milliseconds until expiration. * @return This, to allow chaining. */ - Message &setTimeRemaining(uint64_t timeRemaining) { _timeRemaining = timeRemaining; return *this; } + Message &setTimeRemaining(milliseconds timeRemaining) { _timeRemaining = timeRemaining; return *this; } /** * Returns the number of milliseconds that remain right now before this @@ -93,7 +80,7 @@ public: * * @return The remaining time in milliseconds. */ - uint64_t getTimeRemainingNow() const; + milliseconds getTimeRemainingNow() const; /** * Access the route associated with this message. @@ -196,6 +183,12 @@ public: * @return This, to allow chaining. */ Message &setRetry(uint32_t retry) { _retry = retry; return *this; } +private: + Route _route; + time_point _timeReceived; + milliseconds _timeRemaining; + bool _retryEnabled; + uint32_t _retry; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index fd1ad2908c7..d277063a273 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -15,6 +15,7 @@ LOG_SETUP(".messagebus"); using vespalib::LockGuard; using vespalib::make_string; +using namespace std::chrono_literals; namespace { @@ -64,7 +65,7 @@ public: _gate(gate) { } - ~ShutdownTask() { + ~ShutdownTask() override { _gate.countDown(); } @@ -150,7 +151,7 @@ MessageBus::setup(const MessageBusParams ¶ms) if (!_network.start()) { throw vespalib::NetworkSetupFailureException("Failed to start network."); } - if (!_network.waitUntilReady(120)) { + if (!_network.waitUntilReady(120s)) { throw vespalib::NetworkSetupFailureException("Network failed to become ready in time."); } @@ -396,7 +397,7 @@ MessageBus::deliverReply(Reply::UP reply, IReplyHandler &handler) _msn->deliverReply(std::move(reply), handler); } -const string +string MessageBus::getConnectionSpec() const { return _network.getConnectionSpec(); diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h index d00444b563a..f10898275b3 100644 --- a/messagebus/src/vespa/messagebus/messagebus.h +++ b/messagebus/src/vespa/messagebus/messagebus.h @@ -274,7 +274,7 @@ public: * * @return The connection string. */ - const string getConnectionSpec() const; + string getConnectionSpec() const; /** * Provide access to the underlying {@link Messenger} object. diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h index a22e9fe9e11..037298cf7c0 100644 --- a/messagebus/src/vespa/messagebus/network/inetwork.h +++ b/messagebus/src/vespa/messagebus/network/inetwork.h @@ -62,7 +62,7 @@ public: * @param seconds The timeout. * @return True if ready. */ - virtual bool waitUntilReady(double seconds) const = 0; + virtual bool waitUntilReady(seconds timeout) const = 0; /** * Register a session name with the network layer. This will make the diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index e214a435a4e..19e33cb02fd 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -227,12 +227,12 @@ RPCNetwork::start() } bool -RPCNetwork::waitUntilReady(double seconds) const +RPCNetwork::waitUntilReady(seconds timeout) const { slobrok::api::SlobrokList brokerList; slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList); bool hasConfig = false; - for (uint32_t i = 0; i < seconds * 100; ++i) { + for (uint32_t i = 0; i < timeout.count() * 100; ++i) { if (configurator->poll()) { hasConfig = true; } @@ -242,10 +242,10 @@ RPCNetwork::waitUntilReady(double seconds) const std::this_thread::sleep_for(10ms); } if (! hasConfig) { - LOG(error, "failed to get config for slobroks in %d seconds", (int)seconds); + LOG(error, "failed to get config for slobroks in %2.2f seconds", timeout.count()); } else if (! _mirror->ready()) { auto brokers = brokerList.logString(); - LOG(error, "mirror (of %s) failed to become ready in %d seconds", brokers.c_str(), (int)seconds); + LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), timeout.count()); } return false; } @@ -322,7 +322,7 @@ void RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients) { SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self - double timeout = ctx._msg.getTimeRemainingNow() / 1000.0; + double timeout = ctx._msg.getTimeRemainingNow().count() / 1000.0; for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) { RoutingNode *&recipient = ctx._recipients[i]; @@ -373,13 +373,13 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx) make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str())); } else { - uint64_t timeRemaining = ctx._msg.getTimeRemainingNow(); + std::chrono::milliseconds timeRemaining = ctx._msg.getTimeRemainingNow(); Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg); RPCSendAdapter *adapter = getSendAdapter(ctx._version); if (adapter == nullptr) { replyError(ctx, ErrorCode::INCOMPATIBLE_VERSION, make_string("Can not send to version '%s' recipient.", ctx._version.toString().c_str())); - } else if (timeRemaining == 0) { + } else if (timeRemaining == 0ms) { replyError(ctx, ErrorCode::TIMEOUT, "Aborting transmission because zero time remains."); } else if (payload.size() == 0) { replyError(ctx, ErrorCode::ENCODE_ERROR, diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 123987fea68..169bdd86dd9 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -219,7 +219,7 @@ public: void attach(INetworkOwner &owner) override; const string getConnectionSpec() const override; bool start() override; - bool waitUntilReady(double seconds) const override; + bool waitUntilReady(seconds timout) const override; void registerSession(const string &session) override; void unregisterSession(const string &session) override; bool allocServiceAddress(RoutingNode &recipient) override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 87c87173ec7..bed2847f3c6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -98,20 +98,20 @@ RPCSend::handleDiscard(Context ctx) } void -RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining) +RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, milliseconds timeRemaining) { send(recipient, version, FillByHandover(std::move(payload)), timeRemaining); } void -RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining) +RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, milliseconds timeRemaining) { send(recipient, version, FillByCopy(payload), timeRemaining); } void RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & payload, uint64_t timeRemaining) + const PayLoadFiller & payload, milliseconds timeRemaining) { SendContext::UP ctx(new SendContext(recipient, timeRemaining)); RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress()); diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index 051abff6d1b..e207f7c1812 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -38,13 +38,14 @@ class RPCSend : public RPCSendAdapter, public: class Params { public: + using milliseconds = std::chrono::milliseconds; virtual ~Params() {} virtual vespalib::Version getVersion() const = 0; virtual vespalib::stringref getProtocol() const = 0; virtual uint32_t getTraceLevel() const = 0; virtual bool useRetry() const = 0; virtual uint32_t getRetries() const = 0; - virtual uint64_t getRemainingTime() const = 0; + virtual milliseconds getRemainingTime() const = 0; virtual vespalib::stringref getRoute() const = 0; virtual vespalib::stringref getSession() const = 0; virtual BlobRef getPayload() const = 0; @@ -59,13 +60,13 @@ protected: Error & error, vespalib::TraceNode & rootTrace) const = 0; virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const = 0; + const PayLoadFiller &filler, milliseconds timeRemaining) const = 0; virtual const char * getReturnSpec() const = 0; virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0; virtual std::unique_ptr<Params> toParams(const FRT_Values ¶m) const = 0; void send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & filler, uint64_t timeRemaining); + const PayLoadFiller & filler, milliseconds timeRemaining); std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version, BlobRef payload, Error & error) const; /** @@ -89,9 +90,9 @@ private: void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, uint64_t timeRemaining) final override; + Blob payload, milliseconds timeRemaining) final override; void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, uint64_t timeRemaining) final override; + BlobRef payload, milliseconds timeRemaining) final override; void RequestDone(FRT_RPCRequest *req) final override; void handleReply(std::unique_ptr<Reply> reply) final override; }; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h index f5867e79856..6462bb0691e 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h @@ -16,13 +16,15 @@ private: double _timeout; public: - typedef std::unique_ptr<SendContext> UP; + using milliseconds = std::chrono::milliseconds; + using UP = std::unique_ptr<SendContext>; SendContext(const SendContext &) = delete; SendContext & operator = (const SendContext &) = delete; - SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining) - : _recipient(recipient), - _trace(recipient.getTrace().getLevel()), - _timeout(timeRemaining * 0.001) { } + SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining) + : _recipient(recipient), + _trace(recipient.getTrace().getLevel()), + _timeout(timeRemaining.count() * 0.001) + { } mbus::RoutingNode &getRecipient() { return _recipient; } mbus::Trace &getTrace() { return _trace; } double getTimeout() { return _timeout; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h index d06f3531ca5..9b841811bb2 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h @@ -3,6 +3,7 @@ #include <vespa/messagebus/blobref.h> #include <vespa/vespalib/component/version.h> +#include <chrono> namespace mbus { @@ -19,6 +20,7 @@ class RPCSendAdapter protected: RPCSendAdapter() = default; public: + using milliseconds = std::chrono::milliseconds; RPCSendAdapter(const RPCSendAdapter &) = delete; RPCSendAdapter & operator = (const RPCSendAdapter &) = delete; /** @@ -42,7 +44,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, uint64_t timeRemaining) = 0; + BlobRef payload, milliseconds timeRemaining) = 0; /** * Performs the actual sending to the given recipient. @@ -53,7 +55,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, uint64_t timeRemaining) = 0; + Blob payload, milliseconds timeRemaining) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp index 376267b555c..e902aa20965 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp @@ -59,7 +59,7 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder) void RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const + const PayLoadFiller &filler, milliseconds timeRemaining) const { FRT_Values &args = *req.GetParams(); @@ -69,7 +69,7 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, args.AddString(address.getSessionName().c_str()); args.AddInt8(msg.getRetryEnabled() ? 1 : 0); args.AddInt32(msg.getRetry()); - args.AddInt64(timeRemaining); + args.AddInt64(timeRemaining.count()); args.AddString(msg.getProtocol().c_str()); filler.fill(args); args.AddInt32(traceLevel); @@ -85,7 +85,7 @@ public: uint32_t getTraceLevel() const override { return _args[8]._intval32; } bool useRetry() const override { return _args[3]._intval8 != 0; } uint32_t getRetries() const override { return _args[4]._intval32; } - uint64_t getRemainingTime() const override { return _args[5]._intval64; } + milliseconds getRemainingTime() const override { return milliseconds(_args[5]._intval64); } vespalib::Version getVersion() const override { return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len)); diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 37f23335309..3265c304830 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const override; + const PayLoadFiller &filler, milliseconds timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp index 91a41a6a800..3e453bb60eb 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp @@ -101,7 +101,7 @@ private: void RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const + const PayLoadFiller &filler, milliseconds timeRemaining) const { FRT_Values &args = *req.GetParams(); req.SetMethodName(METHOD_NAME); @@ -118,7 +118,7 @@ RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Rout root.setString(SESSION_F, address.getSessionName()); root.setBool(USERETRY_F, msg.getRetryEnabled()); root.setLong(RETRY_F, msg.getRetry()); - root.setLong(TIMELEFT_F, timeRemaining); + root.setLong(TIMELEFT_F, timeRemaining.count()); root.setString(PROTOCOL_F, msg.getProtocol()); root.setLong(TRACELEVEL_F, traceLevel); filler.fill(BLOB_F, root); @@ -156,7 +156,7 @@ public: uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); } bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); } uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); } - uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); } + milliseconds getRemainingTime() const override { return milliseconds(_slime.get()[TIMELEFT_F].asLong()); } Version getVersion() const override { return Version(_slime.get()[VERSION_F].asString().make_stringref()); diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h index e793868d2aa..939c37c81b2 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const override; + const PayLoadFiller &filler, milliseconds timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index 5385ebd8844..26c30d92375 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -74,7 +74,8 @@ Resender::scheduleRetry(RoutingNode &node) if (delay < 0) { delay = _retryPolicy->getRetryDelay(retry); } - if (msg.getTimeRemainingNow() * 0.001 - delay <= 0) { + milliseconds delayMS(static_cast<long>(delay * 1000)); + if (msg.getTimeRemainingNow() <= delayMS) { node.addError(ErrorCode::TIMEOUT, "Timeout exceeded by resender, giving up."); return false; } @@ -83,7 +84,7 @@ Resender::scheduleRetry(RoutingNode &node) TraceLevel::COMPONENT, vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay)); msg.setRetry(retry); - _queue.push(Entry(steady_clock::now() + milliseconds(static_cast<long>(delay * 1000)), &node)); + _queue.push(Entry(steady_clock::now() + delayMS, &node)); return true; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 1f6758b92c3..41fe01625ae 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -9,6 +9,9 @@ using vespalib::make_string; +using namespace std::chrono_literals; +using namespace std::chrono; + namespace mbus { SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams ¶ms) @@ -71,8 +74,8 @@ Result SourceSession::send(Message::UP msg) { msg->setTimeReceivedNow(); - if (msg->getTimeRemaining() == 0) { - msg->setTimeRemaining((uint64_t)(_timeout * 1000)); + if (msg->getTimeRemaining() == 0ms) { + msg->setTimeRemaining(duration_cast<milliseconds>(_timeout)); } { vespalib::MonitorGuard guard(_monitor); @@ -145,7 +148,7 @@ SourceSession & SourceSession::setTimeout(double timeout) { vespalib::MonitorGuard guard(_monitor); - _timeout = timeout; + _timeout = seconds(timeout); return *this; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index 2ce8eea384c..31ebec3555e 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -27,7 +27,7 @@ private: Sequencer _sequencer; IReplyHandler &_replyHandler; IThrottlePolicy::SP _throttlePolicy; - double _timeout; + seconds _timeout; uint32_t _pendingCount; bool _closed; bool _done; @@ -53,7 +53,7 @@ public: * messages. After this method returns, messagebus will not invoke any handlers associated with this * session. **/ - virtual ~SourceSession(); + ~SourceSession() override; /** * This is a convenience function to assign a named route to the given message, and then pass it to the diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp index 51fe91562ae..e7a99f2a1de 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp @@ -24,14 +24,14 @@ SourceSessionParams::setThrottlePolicy(IThrottlePolicy::SP throttlePolicy) return *this; } -double +seconds SourceSessionParams::getTimeout() const { return _timeout; } SourceSessionParams & -SourceSessionParams::setTimeout(double timeout) +SourceSessionParams::setTimeout(seconds timeout) { _timeout = timeout; return *this; diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h index 7c14ee4524d..bc4cad48ba5 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.h +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h @@ -3,10 +3,11 @@ #include "ireplyhandler.h" #include "ithrottlepolicy.h" +#include <chrono> namespace mbus { -/** + /** * To facilitate several configuration parameters to the {@link MessageBus#createSourceSession(ReplyHandler, * SourceSessionParams)}, all parameters are held by this class. This class has reasonable default values for each * parameter. @@ -18,7 +19,7 @@ class SourceSessionParams { private: IReplyHandler *_replyHandler; IThrottlePolicy::SP _throttlePolicy; - double _timeout; + seconds _timeout; public: /** @@ -46,14 +47,14 @@ public: * * @return The total timeout parameter. */ - double getTimeout() const; + seconds getTimeout() const; /** * Returns the number of seconds a message can spend trying to succeed. * * @return The timeout in seconds. */ - SourceSessionParams &setTimeout(double timeout); + SourceSessionParams &setTimeout(seconds timeout); /** * Returns whether or not a reply handler has been assigned to this. |