diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-19 14:06:57 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-20 10:16:58 +0000 |
commit | 86a245e6d0751dec7e38d64c4ea21ea7f4308239 (patch) | |
tree | 108cb5b7a96f96a0db35e95b13bf08e7dadad4d4 | |
parent | 496e5cf57060fb068a9dc73ac7ff44333a60774e (diff) |
Use timeouts typed with unit.
32 files changed, 125 insertions, 108 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 508d0a961f5..93c5d51fef5 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -40,6 +40,8 @@ using namespace documentapi; using vespalib::make_string; using std::make_unique; using std::make_shared; +using namespace std::chrono_literals; + class Test : public vespalib::TestApp { private: @@ -308,7 +310,7 @@ Test::testExternSend() mbus::TestServer src(mbus::Identity("src"), mbus::RoutingSpec(), local, std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::Receptor sr; - mbus::SourceSession::UP ss = src.mb.createSourceSession(sr, mbus::SourceSessionParams().setTimeout(60)); + mbus::SourceSession::UP ss = src.mb.createSourceSession(sr, mbus::SourceSessionParams().setTimeout(60s)); mbus::Slobrok slobrok; mbus::TestServer itr(mbus::Identity("itr"), mbus::RoutingSpec() @@ -349,7 +351,7 @@ Test::testExternMultipleSlobroks() mbus::TestServer src(mbus::Identity("src"), mbus::RoutingSpec(), local, std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::Receptor sr; - mbus::SourceSession::UP ss = src.mb.createSourceSession(sr, mbus::SourceSessionParams().setTimeout(60)); + mbus::SourceSession::UP ss = src.mb.createSourceSession(sr, mbus::SourceSessionParams().setTimeout(60s)); string spec; mbus::Receptor dr; diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp index ebd8d6e250b..2f510d98ff1 100644 --- a/messagebus/src/tests/loadbalance/loadbalance.cpp +++ b/messagebus/src/tests/loadbalance/loadbalance.cpp @@ -16,6 +16,7 @@ #include <vespa/messagebus/testlib/testserver.h> using namespace mbus; +using namespace std::chrono_literals; struct Handler : public IMessageHandler { @@ -64,7 +65,7 @@ Test::Main() RoutableQueue queue; SourceSessionParams params; - params.setTimeout(30.0); + params.setTimeout(30s); params.setThrottlePolicy(IThrottlePolicy::SP()); SourceSession::UP ss = src.mb.createSourceSession(queue, params); diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp index d1b512bb1f1..5a4074f7f02 100644 --- a/messagebus/src/tests/messageordering/messageordering.cpp +++ b/messagebus/src/tests/messageordering/messageordering.cpp @@ -13,6 +13,7 @@ LOG_SETUP("messageordering_test"); using namespace mbus; +using namespace std::chrono_literals; TEST_SETUP(Test); @@ -151,10 +152,10 @@ Test::Main() SourceSessionParams ssp; ssp.setThrottlePolicy(IThrottlePolicy::SP()); - ssp.setTimeout(400); + ssp.setTimeout(400s); SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp); DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst); - ASSERT_EQUAL(400u, ssp.getTimeout()); + ASSERT_EQUAL(400.0, ssp.getTimeout().count()); // wait for slobrok registration ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session")); diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp index 18427a933cd..7c90a7f958f 100644 --- a/messagebus/src/tests/routable/routable.cpp +++ b/messagebus/src/tests/routable/routable.cpp @@ -11,6 +11,7 @@ #include <vespa/vespalib/testkit/testapp.h> using namespace mbus; +using namespace std::chrono_literals; TEST_SETUP(Test); @@ -26,14 +27,14 @@ Test::Main() foo.setRoute(fooRoute); foo.setRetry(1); foo.setTimeReceivedNow(); - foo.setTimeRemaining(2); + foo.setTimeRemaining(2ms); SimpleMessage bar("bar"); Route barRoute = Route::parse("bar"); bar.setRoute(barRoute); bar.setRetry(3); bar.setTimeReceivedNow(); - bar.setTimeRemaining(4); + bar.setTimeRemaining(4ms); foo.swapState(bar); EXPECT_EQUAL(barRoute.toString(), foo.getRoute().toString()); @@ -41,8 +42,8 @@ Test::Main() EXPECT_EQUAL(3u, foo.getRetry()); EXPECT_EQUAL(1u, bar.getRetry()); EXPECT_TRUE(foo.getTimeReceived() >= bar.getTimeReceived()); - EXPECT_EQUAL(4u, foo.getTimeRemaining()); - EXPECT_EQUAL(2u, bar.getTimeRemaining()); + EXPECT_EQUAL(4u, foo.getTimeRemaining().count()); + EXPECT_EQUAL(2u, bar.getTimeRemaining().count()); } { // Test reply swap state. @@ -73,7 +74,7 @@ Test::Main() msg.discard(); Reply::UP reply = handler.getReply(0); - ASSERT_TRUE(reply.get() == NULL); + ASSERT_FALSE(reply); } { // Test reply discard logic. @@ -86,7 +87,7 @@ Test::Main() reply.discard(); Reply::UP ap = handler.getReply(0); - ASSERT_TRUE(ap.get() == NULL); + ASSERT_FALSE(ap); } TEST_DONE(); diff --git a/messagebus/src/tests/timeout/timeout.cpp b/messagebus/src/tests/timeout/timeout.cpp index 03a2c61a052..b2631e13d9c 100644 --- a/messagebus/src/tests/timeout/timeout.cpp +++ b/messagebus/src/tests/timeout/timeout.cpp @@ -13,6 +13,8 @@ #include <vespa/messagebus/testlib/testserver.h> using namespace mbus; +using namespace std::chrono_literals; + class Test : public vespalib::TestApp { public: @@ -42,7 +44,7 @@ Test::testZeroTimeout() TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok); Receptor srcHandler; - SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0)); + SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0s)); Receptor dstHandler; DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler); @@ -50,7 +52,7 @@ Test::testZeroTimeout() ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted()); Reply::UP reply = srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode()); } @@ -63,19 +65,19 @@ Test::testMessageExpires() TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok); Receptor srcHandler, dstHandler; - SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1)); + SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1s)); DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler); ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1)); ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted()); Reply::UP reply = srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode()); Message::UP msg = dstHandler.getMessage(1); - if (msg.get() != NULL) { + if (msg) { msg->discard(); } } diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h index 0e171b9f6d0..c8de001bd97 100644 --- a/messagebus/src/vespa/messagebus/common.h +++ b/messagebus/src/vespa/messagebus/common.h @@ -2,11 +2,15 @@ #pragma once #include <vespa/vespalib/stllike/string.h> +#include <chrono> namespace mbus { // Decide the type of string used once -typedef vespalib::string string; +using string = vespalib::string; + +using seconds = std::chrono::duration<double>; + } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp index 94ef718c404..af1d97334f6 100644 --- a/messagebus/src/vespa/messagebus/message.cpp +++ b/messagebus/src/vespa/messagebus/message.cpp @@ -62,15 +62,15 @@ Message::setTimeReceivedNow() return *this; } -uint64_t +steady_clock::time_point Message::getTimeReceived() const { - return duration_cast<milliseconds>(_timeReceived.time_since_epoch()).count(); + return _timeReceived; } -uint64_t +milliseconds Message::getTimeRemainingNow() const { - return std::max(0L, _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived).count()); + return std::max(milliseconds(0), _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived)); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index 4d4c83b06bd..dce4d86450a 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -3,7 +3,6 @@ #include "routable.h" #include <vespa/messagebus/routing/route.h> -#include <memory> #include <chrono> namespace mbus { @@ -12,23 +11,11 @@ namespace mbus { * A Message is a question, a Reply is the answer. */ class Message : public Routable { -private: - using time_point = std::chrono::steady_clock::time_point; - Route _route; - time_point _timeReceived; - int64_t _timeRemaining; - bool _retryEnabled; - uint32_t _retry; - public: - /** - * Convenience typedef for an auto pointer to a Message object. - */ - typedef std::unique_ptr<Message> UP; + using time_point = std::chrono::steady_clock::time_point; + using milliseconds = std::chrono::milliseconds; + using UP = std::unique_ptr<Message>; - /** - * Constructs a new instance of this class. - */ Message(); Message(const Message &) = delete; Message(Message &&) = delete; @@ -51,7 +38,7 @@ public: * * @return The timestamp this was last seen. */ - uint64_t getTimeReceived() const; + time_point getTimeReceived() const; /** * This is a convenience method to call {@link #setTimeReceived(uint64_t)} @@ -69,7 +56,7 @@ public: * * @return The remaining time in milliseconds. */ - uint64_t getTimeRemaining() const { return _timeRemaining; } + milliseconds getTimeRemaining() const { return _timeRemaining; } /** * Sets the numer of milliseconds that remain before this message times @@ -79,7 +66,7 @@ public: * @param timeRemaining The number of milliseconds until expiration. * @return This, to allow chaining. */ - Message &setTimeRemaining(uint64_t timeRemaining) { _timeRemaining = timeRemaining; return *this; } + Message &setTimeRemaining(milliseconds timeRemaining) { _timeRemaining = timeRemaining; return *this; } /** * Returns the number of milliseconds that remain right now before this @@ -93,7 +80,7 @@ public: * * @return The remaining time in milliseconds. */ - uint64_t getTimeRemainingNow() const; + milliseconds getTimeRemainingNow() const; /** * Access the route associated with this message. @@ -196,6 +183,12 @@ public: * @return This, to allow chaining. */ Message &setRetry(uint32_t retry) { _retry = retry; return *this; } +private: + Route _route; + time_point _timeReceived; + milliseconds _timeRemaining; + bool _retryEnabled; + uint32_t _retry; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index fd1ad2908c7..d277063a273 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -15,6 +15,7 @@ LOG_SETUP(".messagebus"); using vespalib::LockGuard; using vespalib::make_string; +using namespace std::chrono_literals; namespace { @@ -64,7 +65,7 @@ public: _gate(gate) { } - ~ShutdownTask() { + ~ShutdownTask() override { _gate.countDown(); } @@ -150,7 +151,7 @@ MessageBus::setup(const MessageBusParams ¶ms) if (!_network.start()) { throw vespalib::NetworkSetupFailureException("Failed to start network."); } - if (!_network.waitUntilReady(120)) { + if (!_network.waitUntilReady(120s)) { throw vespalib::NetworkSetupFailureException("Network failed to become ready in time."); } @@ -396,7 +397,7 @@ MessageBus::deliverReply(Reply::UP reply, IReplyHandler &handler) _msn->deliverReply(std::move(reply), handler); } -const string +string MessageBus::getConnectionSpec() const { return _network.getConnectionSpec(); diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h index d00444b563a..f10898275b3 100644 --- a/messagebus/src/vespa/messagebus/messagebus.h +++ b/messagebus/src/vespa/messagebus/messagebus.h @@ -274,7 +274,7 @@ public: * * @return The connection string. */ - const string getConnectionSpec() const; + string getConnectionSpec() const; /** * Provide access to the underlying {@link Messenger} object. diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h index a22e9fe9e11..037298cf7c0 100644 --- a/messagebus/src/vespa/messagebus/network/inetwork.h +++ b/messagebus/src/vespa/messagebus/network/inetwork.h @@ -62,7 +62,7 @@ public: * @param seconds The timeout. * @return True if ready. */ - virtual bool waitUntilReady(double seconds) const = 0; + virtual bool waitUntilReady(seconds timeout) const = 0; /** * Register a session name with the network layer. This will make the diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index e214a435a4e..19e33cb02fd 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -227,12 +227,12 @@ RPCNetwork::start() } bool -RPCNetwork::waitUntilReady(double seconds) const +RPCNetwork::waitUntilReady(seconds timeout) const { slobrok::api::SlobrokList brokerList; slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList); bool hasConfig = false; - for (uint32_t i = 0; i < seconds * 100; ++i) { + for (uint32_t i = 0; i < timeout.count() * 100; ++i) { if (configurator->poll()) { hasConfig = true; } @@ -242,10 +242,10 @@ RPCNetwork::waitUntilReady(double seconds) const std::this_thread::sleep_for(10ms); } if (! hasConfig) { - LOG(error, "failed to get config for slobroks in %d seconds", (int)seconds); + LOG(error, "failed to get config for slobroks in %2.2f seconds", timeout.count()); } else if (! _mirror->ready()) { auto brokers = brokerList.logString(); - LOG(error, "mirror (of %s) failed to become ready in %d seconds", brokers.c_str(), (int)seconds); + LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), timeout.count()); } return false; } @@ -322,7 +322,7 @@ void RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients) { SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self - double timeout = ctx._msg.getTimeRemainingNow() / 1000.0; + double timeout = ctx._msg.getTimeRemainingNow().count() / 1000.0; for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) { RoutingNode *&recipient = ctx._recipients[i]; @@ -373,13 +373,13 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx) make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str())); } else { - uint64_t timeRemaining = ctx._msg.getTimeRemainingNow(); + std::chrono::milliseconds timeRemaining = ctx._msg.getTimeRemainingNow(); Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg); RPCSendAdapter *adapter = getSendAdapter(ctx._version); if (adapter == nullptr) { replyError(ctx, ErrorCode::INCOMPATIBLE_VERSION, make_string("Can not send to version '%s' recipient.", ctx._version.toString().c_str())); - } else if (timeRemaining == 0) { + } else if (timeRemaining == 0ms) { replyError(ctx, ErrorCode::TIMEOUT, "Aborting transmission because zero time remains."); } else if (payload.size() == 0) { replyError(ctx, ErrorCode::ENCODE_ERROR, diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 123987fea68..169bdd86dd9 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -219,7 +219,7 @@ public: void attach(INetworkOwner &owner) override; const string getConnectionSpec() const override; bool start() override; - bool waitUntilReady(double seconds) const override; + bool waitUntilReady(seconds timout) const override; void registerSession(const string &session) override; void unregisterSession(const string &session) override; bool allocServiceAddress(RoutingNode &recipient) override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 87c87173ec7..bed2847f3c6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -98,20 +98,20 @@ RPCSend::handleDiscard(Context ctx) } void -RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining) +RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, milliseconds timeRemaining) { send(recipient, version, FillByHandover(std::move(payload)), timeRemaining); } void -RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining) +RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, milliseconds timeRemaining) { send(recipient, version, FillByCopy(payload), timeRemaining); } void RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & payload, uint64_t timeRemaining) + const PayLoadFiller & payload, milliseconds timeRemaining) { SendContext::UP ctx(new SendContext(recipient, timeRemaining)); RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress()); diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index 051abff6d1b..e207f7c1812 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -38,13 +38,14 @@ class RPCSend : public RPCSendAdapter, public: class Params { public: + using milliseconds = std::chrono::milliseconds; virtual ~Params() {} virtual vespalib::Version getVersion() const = 0; virtual vespalib::stringref getProtocol() const = 0; virtual uint32_t getTraceLevel() const = 0; virtual bool useRetry() const = 0; virtual uint32_t getRetries() const = 0; - virtual uint64_t getRemainingTime() const = 0; + virtual milliseconds getRemainingTime() const = 0; virtual vespalib::stringref getRoute() const = 0; virtual vespalib::stringref getSession() const = 0; virtual BlobRef getPayload() const = 0; @@ -59,13 +60,13 @@ protected: Error & error, vespalib::TraceNode & rootTrace) const = 0; virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const = 0; + const PayLoadFiller &filler, milliseconds timeRemaining) const = 0; virtual const char * getReturnSpec() const = 0; virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0; virtual std::unique_ptr<Params> toParams(const FRT_Values ¶m) const = 0; void send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & filler, uint64_t timeRemaining); + const PayLoadFiller & filler, milliseconds timeRemaining); std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version, BlobRef payload, Error & error) const; /** @@ -89,9 +90,9 @@ private: void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, uint64_t timeRemaining) final override; + Blob payload, milliseconds timeRemaining) final override; void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, uint64_t timeRemaining) final override; + BlobRef payload, milliseconds timeRemaining) final override; void RequestDone(FRT_RPCRequest *req) final override; void handleReply(std::unique_ptr<Reply> reply) final override; }; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h index f5867e79856..6462bb0691e 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h @@ -16,13 +16,15 @@ private: double _timeout; public: - typedef std::unique_ptr<SendContext> UP; + using milliseconds = std::chrono::milliseconds; + using UP = std::unique_ptr<SendContext>; SendContext(const SendContext &) = delete; SendContext & operator = (const SendContext &) = delete; - SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining) - : _recipient(recipient), - _trace(recipient.getTrace().getLevel()), - _timeout(timeRemaining * 0.001) { } + SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining) + : _recipient(recipient), + _trace(recipient.getTrace().getLevel()), + _timeout(timeRemaining.count() * 0.001) + { } mbus::RoutingNode &getRecipient() { return _recipient; } mbus::Trace &getTrace() { return _trace; } double getTimeout() { return _timeout; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h index d06f3531ca5..9b841811bb2 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h @@ -3,6 +3,7 @@ #include <vespa/messagebus/blobref.h> #include <vespa/vespalib/component/version.h> +#include <chrono> namespace mbus { @@ -19,6 +20,7 @@ class RPCSendAdapter protected: RPCSendAdapter() = default; public: + using milliseconds = std::chrono::milliseconds; RPCSendAdapter(const RPCSendAdapter &) = delete; RPCSendAdapter & operator = (const RPCSendAdapter &) = delete; /** @@ -42,7 +44,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, uint64_t timeRemaining) = 0; + BlobRef payload, milliseconds timeRemaining) = 0; /** * Performs the actual sending to the given recipient. @@ -53,7 +55,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, uint64_t timeRemaining) = 0; + Blob payload, milliseconds timeRemaining) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp index 376267b555c..e902aa20965 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp @@ -59,7 +59,7 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder) void RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const + const PayLoadFiller &filler, milliseconds timeRemaining) const { FRT_Values &args = *req.GetParams(); @@ -69,7 +69,7 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, args.AddString(address.getSessionName().c_str()); args.AddInt8(msg.getRetryEnabled() ? 1 : 0); args.AddInt32(msg.getRetry()); - args.AddInt64(timeRemaining); + args.AddInt64(timeRemaining.count()); args.AddString(msg.getProtocol().c_str()); filler.fill(args); args.AddInt32(traceLevel); @@ -85,7 +85,7 @@ public: uint32_t getTraceLevel() const override { return _args[8]._intval32; } bool useRetry() const override { return _args[3]._intval8 != 0; } uint32_t getRetries() const override { return _args[4]._intval32; } - uint64_t getRemainingTime() const override { return _args[5]._intval64; } + milliseconds getRemainingTime() const override { return milliseconds(_args[5]._intval64); } vespalib::Version getVersion() const override { return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len)); diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 37f23335309..3265c304830 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const override; + const PayLoadFiller &filler, milliseconds timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp index 91a41a6a800..3e453bb60eb 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp @@ -101,7 +101,7 @@ private: void RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const + const PayLoadFiller &filler, milliseconds timeRemaining) const { FRT_Values &args = *req.GetParams(); req.SetMethodName(METHOD_NAME); @@ -118,7 +118,7 @@ RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Rout root.setString(SESSION_F, address.getSessionName()); root.setBool(USERETRY_F, msg.getRetryEnabled()); root.setLong(RETRY_F, msg.getRetry()); - root.setLong(TIMELEFT_F, timeRemaining); + root.setLong(TIMELEFT_F, timeRemaining.count()); root.setString(PROTOCOL_F, msg.getProtocol()); root.setLong(TRACELEVEL_F, traceLevel); filler.fill(BLOB_F, root); @@ -156,7 +156,7 @@ public: uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); } bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); } uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); } - uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); } + milliseconds getRemainingTime() const override { return milliseconds(_slime.get()[TIMELEFT_F].asLong()); } Version getVersion() const override { return Version(_slime.get()[VERSION_F].asString().make_stringref()); diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h index e793868d2aa..939c37c81b2 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const override; + const PayLoadFiller &filler, milliseconds timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index 5385ebd8844..26c30d92375 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -74,7 +74,8 @@ Resender::scheduleRetry(RoutingNode &node) if (delay < 0) { delay = _retryPolicy->getRetryDelay(retry); } - if (msg.getTimeRemainingNow() * 0.001 - delay <= 0) { + milliseconds delayMS(static_cast<long>(delay * 1000)); + if (msg.getTimeRemainingNow() <= delayMS) { node.addError(ErrorCode::TIMEOUT, "Timeout exceeded by resender, giving up."); return false; } @@ -83,7 +84,7 @@ Resender::scheduleRetry(RoutingNode &node) TraceLevel::COMPONENT, vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay)); msg.setRetry(retry); - _queue.push(Entry(steady_clock::now() + milliseconds(static_cast<long>(delay * 1000)), &node)); + _queue.push(Entry(steady_clock::now() + delayMS, &node)); return true; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 1f6758b92c3..41fe01625ae 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -9,6 +9,9 @@ using vespalib::make_string; +using namespace std::chrono_literals; +using namespace std::chrono; + namespace mbus { SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams ¶ms) @@ -71,8 +74,8 @@ Result SourceSession::send(Message::UP msg) { msg->setTimeReceivedNow(); - if (msg->getTimeRemaining() == 0) { - msg->setTimeRemaining((uint64_t)(_timeout * 1000)); + if (msg->getTimeRemaining() == 0ms) { + msg->setTimeRemaining(duration_cast<milliseconds>(_timeout)); } { vespalib::MonitorGuard guard(_monitor); @@ -145,7 +148,7 @@ SourceSession & SourceSession::setTimeout(double timeout) { vespalib::MonitorGuard guard(_monitor); - _timeout = timeout; + _timeout = seconds(timeout); return *this; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index 2ce8eea384c..31ebec3555e 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -27,7 +27,7 @@ private: Sequencer _sequencer; IReplyHandler &_replyHandler; IThrottlePolicy::SP _throttlePolicy; - double _timeout; + seconds _timeout; uint32_t _pendingCount; bool _closed; bool _done; @@ -53,7 +53,7 @@ public: * messages. After this method returns, messagebus will not invoke any handlers associated with this * session. **/ - virtual ~SourceSession(); + ~SourceSession() override; /** * This is a convenience function to assign a named route to the given message, and then pass it to the diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp index 51fe91562ae..e7a99f2a1de 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp @@ -24,14 +24,14 @@ SourceSessionParams::setThrottlePolicy(IThrottlePolicy::SP throttlePolicy) return *this; } -double +seconds SourceSessionParams::getTimeout() const { return _timeout; } SourceSessionParams & -SourceSessionParams::setTimeout(double timeout) +SourceSessionParams::setTimeout(seconds timeout) { _timeout = timeout; return *this; diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h index 7c14ee4524d..bc4cad48ba5 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.h +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h @@ -3,10 +3,11 @@ #include "ireplyhandler.h" #include "ithrottlepolicy.h" +#include <chrono> namespace mbus { -/** + /** * To facilitate several configuration parameters to the {@link MessageBus#createSourceSession(ReplyHandler, * SourceSessionParams)}, all parameters are held by this class. This class has reasonable default values for each * parameter. @@ -18,7 +19,7 @@ class SourceSessionParams { private: IReplyHandler *_replyHandler; IThrottlePolicy::SP _throttlePolicy; - double _timeout; + seconds _timeout; public: /** @@ -46,14 +47,14 @@ public: * * @return The total timeout parameter. */ - double getTimeout() const; + seconds getTimeout() const; /** * Returns the number of seconds a message can spend trying to succeed. * * @return The timeout in seconds. */ - SourceSessionParams &setTimeout(double timeout); + SourceSessionParams &setTimeout(seconds timeout); /** * Returns whether or not a reply handler has been assigned to this. diff --git a/messagebus_test/src/tests/error/cpp-client.cpp b/messagebus_test/src/tests/error/cpp-client.cpp index 2a3376336ea..f186be68d01 100644 --- a/messagebus_test/src/tests/error/cpp-client.cpp +++ b/messagebus_test/src/tests/error/cpp-client.cpp @@ -10,6 +10,7 @@ #include <vespa/fastos/app.h> using namespace mbus; +using namespace std::chrono_literals; class App : public FastOS_Application { @@ -29,7 +30,7 @@ App::Main() Message::UP msg; Reply::UP reply; - SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams().setTimeout(300)); + SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams().setTimeout(300s)); for (int i = 0; i < 10; ++i) { msg.reset(new SimpleMessage("test")); msg->getTrace().setLevel(9); diff --git a/messagebus_test/src/tests/speed/cpp-client.cpp b/messagebus_test/src/tests/speed/cpp-client.cpp index 299b2a01948..59ca47514c9 100644 --- a/messagebus_test/src/tests/speed/cpp-client.cpp +++ b/messagebus_test/src/tests/speed/cpp-client.cpp @@ -10,6 +10,7 @@ #include <vespa/fastos/app.h> using namespace mbus; +using namespace std::chrono_literals; class Client : public IReplyHandler { @@ -95,7 +96,7 @@ App::Main() RPCMessageBus mb(MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(std::make_shared<SimpleProtocol>()), RPCNetworkParams("file:slobrok.cfg").setIdentity(Identity("server/cpp")), "file:routing.cfg"); - Client client(mb.getMessageBus(), SourceSessionParams().setTimeout(30)); + Client client(mb.getMessageBus(), SourceSessionParams().setTimeout(30s)); // let the system 'warm up' FastOS_Thread::Sleep(5000); diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index f006277a7b6..6ef39ecf858 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -28,6 +28,7 @@ using document::DocumentTypeRepo; using document::readDocumenttypesConfig; using document::test::makeDocumentBucket; using namespace ::testing; +using namespace std::chrono_literals; namespace storage { @@ -186,7 +187,7 @@ TEST_F(DocumentApiConverterTest, get) { TEST_F(DocumentApiConverterTest, create_visitor) { documentapi::CreateVisitorMessage cv("mylib", "myinstance", "control-dest", "data-dest"); cv.setBucketSpace(defaultSpaceName); - cv.setTimeRemaining(123456); + cv.setTimeRemaining(123456ms); auto cmd = toStorageAPI<api::CreateVisitorCommand>(cv); EXPECT_EQ(defaultBucketSpace, cmd->getBucket().getBucketSpace()); @@ -202,7 +203,7 @@ TEST_F(DocumentApiConverterTest, create_visitor) { TEST_F(DocumentApiConverterTest, create_visitor_high_timeout) { documentapi::CreateVisitorMessage cv("mylib", "myinstance", "control-dest", "data-dest"); - cv.setTimeRemaining((uint64_t)std::numeric_limits<uint32_t>::max() + 1); // Will be INT_MAX + cv.setTimeRemaining(std::chrono::milliseconds(std::numeric_limits<uint32_t>::max() + 1)); // Will be INT_MAX auto cmd = toStorageAPI<api::CreateVisitorCommand>(cv); EXPECT_EQ("mylib", cmd->getLibraryName()); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 978d434847e..499d7ce15ac 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -148,7 +148,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) //TODO: Can it be moved ? std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand(); - cmd->setTimeout(storMsgPtr->getTimeRemaining()); + cmd->setTimeout(storMsgPtr->getTimeRemaining().count()); cmd->setTrace(storMsgPtr->getTrace()); cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr))); @@ -567,7 +567,7 @@ CommunicationManager::sendCommand( cmd->setContext(mbus::Context(msg->getMsgId())); cmd->setRetryEnabled(address.retryEnabled()); - cmd->setTimeRemaining(msg->getTimeout()); + cmd->setTimeRemaining(std::chrono::milliseconds(msg->getTimeout())); cmd->setTrace(msg->getTrace()); sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); break; diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index 54b1a9ae257..7318d690004 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -20,6 +20,8 @@ LOG_SETUP(".documentapiconverter"); using document::BucketSpace; +using std::chrono::milliseconds; + namespace storage { DocumentApiConverter::DocumentApiConverter(const config::ConfigUri &configUri, @@ -28,7 +30,7 @@ DocumentApiConverter::DocumentApiConverter(const config::ConfigUri &configUri, _bucketResolver(std::move(bucketResolver)) {} -DocumentApiConverter::~DocumentApiConverter() {} +DocumentApiConverter::~DocumentApiConverter() = default; std::unique_ptr<api::StorageCommand> DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) @@ -139,11 +141,8 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) } if (toMsg.get() != 0) { - int64_t timeout = fromMsg.getTimeRemaining(); - if (timeout > INT_MAX) { - timeout = INT_MAX; - } - toMsg->setTimeout(timeout); + milliseconds timeout = std::max(milliseconds(INT_MAX), fromMsg.getTimeRemaining()); + toMsg->setTimeout(timeout.count()); toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority())); toMsg->setLoadType(fromMsg.getLoadType()); @@ -310,7 +309,7 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg) } if (toMsg.get()) { - toMsg->setTimeRemaining(fromMsg.getTimeout()); + toMsg->setTimeRemaining(milliseconds(fromMsg.getTimeout())); toMsg->setContext(mbus::Context(fromMsg.getMsgId())); if (LOG_WOULD_LOG(spam)) { toMsg->getTrace().setLevel(9); diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 3bed02f88fe..4b213dff1d5 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -237,11 +237,11 @@ Visitor::sendMessage(documentapi::DocumentMessage::UP cmd) framework::MicroSecTime time(_component.getClock().getTimeInMicros()); if (time + _docBlockTimeout.getMicros() > _timeToDie) { - cmd->setTimeRemaining((_timeToDie > time) + cmd->setTimeRemaining(std::chrono::milliseconds((_timeToDie > time) ? (_timeToDie - time).getMillis().getTime() - : 0); + : 0)); } else { - cmd->setTimeRemaining(_docBlockTimeout.getTime()); + cmd->setTimeRemaining(std::chrono::milliseconds(_docBlockTimeout.getTime())); } cmd->getTrace().setLevel(_traceLevel); @@ -301,7 +301,7 @@ Visitor::sendInfoMessage(documentapi::VisitorInfoMessage::UP cmd) if (_controlDestination->toString().length()) { cmd->setRoute(_controlDestination->getRoute()); cmd->setPriority(_documentPriority); - cmd->setTimeRemaining(_visitorInfoTimeout.getTime()); + cmd->setTimeRemaining(std::chrono::milliseconds(_visitorInfoTimeout.getTime())); auto& msgMeta = _visitorTarget.insertMessage(std::move(cmd)); sendDocumentApiMessage(msgMeta); } @@ -637,7 +637,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, if (!reply->hasErrors()) { metrics.averageMessageSendTime[getLoadType()].addValue( - (message->getTimeRemaining() - message->getTimeRemainingNow()) / 1000.0); + (message->getTimeRemaining() - message->getTimeRemainingNow()).count() / 1000.0); LOG(debug, "Visitor '%s' reply %s for message ID %" PRIu64 " was OK", _id.c_str(), reply->toString().c_str(), messageId); |