summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-11-19 14:06:57 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-11-20 10:16:58 +0000
commit86a245e6d0751dec7e38d64c4ea21ea7f4308239 (patch)
tree108cb5b7a96f96a0db35e95b13bf08e7dadad4d4 /messagebus
parent496e5cf57060fb068a9dc73ac7ff44333a60774e (diff)
Use timeouts typed with unit.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/loadbalance/loadbalance.cpp3
-rw-r--r--messagebus/src/tests/messageordering/messageordering.cpp5
-rw-r--r--messagebus/src/tests/routable/routable.cpp13
-rw-r--r--messagebus/src/tests/timeout/timeout.cpp12
-rw-r--r--messagebus/src/vespa/messagebus/common.h6
-rw-r--r--messagebus/src/vespa/messagebus/message.cpp8
-rw-r--r--messagebus/src/vespa/messagebus/message.h33
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/inetwork.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h11
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend_private.h12
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendadapter.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.h2
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp5
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.h4
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.h9
25 files changed, 101 insertions, 88 deletions
diff --git a/messagebus/src/tests/loadbalance/loadbalance.cpp b/messagebus/src/tests/loadbalance/loadbalance.cpp
index ebd8d6e250b..2f510d98ff1 100644
--- a/messagebus/src/tests/loadbalance/loadbalance.cpp
+++ b/messagebus/src/tests/loadbalance/loadbalance.cpp
@@ -16,6 +16,7 @@
#include <vespa/messagebus/testlib/testserver.h>
using namespace mbus;
+using namespace std::chrono_literals;
struct Handler : public IMessageHandler
{
@@ -64,7 +65,7 @@ Test::Main()
RoutableQueue queue;
SourceSessionParams params;
- params.setTimeout(30.0);
+ params.setTimeout(30s);
params.setThrottlePolicy(IThrottlePolicy::SP());
SourceSession::UP ss = src.mb.createSourceSession(queue, params);
diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp
index d1b512bb1f1..5a4074f7f02 100644
--- a/messagebus/src/tests/messageordering/messageordering.cpp
+++ b/messagebus/src/tests/messageordering/messageordering.cpp
@@ -13,6 +13,7 @@
LOG_SETUP("messageordering_test");
using namespace mbus;
+using namespace std::chrono_literals;
TEST_SETUP(Test);
@@ -151,10 +152,10 @@ Test::Main()
SourceSessionParams ssp;
ssp.setThrottlePolicy(IThrottlePolicy::SP());
- ssp.setTimeout(400);
+ ssp.setTimeout(400s);
SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp);
DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst);
- ASSERT_EQUAL(400u, ssp.getTimeout());
+ ASSERT_EQUAL(400.0, ssp.getTimeout().count());
// wait for slobrok registration
ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session"));
diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp
index 18427a933cd..7c90a7f958f 100644
--- a/messagebus/src/tests/routable/routable.cpp
+++ b/messagebus/src/tests/routable/routable.cpp
@@ -11,6 +11,7 @@
#include <vespa/vespalib/testkit/testapp.h>
using namespace mbus;
+using namespace std::chrono_literals;
TEST_SETUP(Test);
@@ -26,14 +27,14 @@ Test::Main()
foo.setRoute(fooRoute);
foo.setRetry(1);
foo.setTimeReceivedNow();
- foo.setTimeRemaining(2);
+ foo.setTimeRemaining(2ms);
SimpleMessage bar("bar");
Route barRoute = Route::parse("bar");
bar.setRoute(barRoute);
bar.setRetry(3);
bar.setTimeReceivedNow();
- bar.setTimeRemaining(4);
+ bar.setTimeRemaining(4ms);
foo.swapState(bar);
EXPECT_EQUAL(barRoute.toString(), foo.getRoute().toString());
@@ -41,8 +42,8 @@ Test::Main()
EXPECT_EQUAL(3u, foo.getRetry());
EXPECT_EQUAL(1u, bar.getRetry());
EXPECT_TRUE(foo.getTimeReceived() >= bar.getTimeReceived());
- EXPECT_EQUAL(4u, foo.getTimeRemaining());
- EXPECT_EQUAL(2u, bar.getTimeRemaining());
+ EXPECT_EQUAL(4u, foo.getTimeRemaining().count());
+ EXPECT_EQUAL(2u, bar.getTimeRemaining().count());
}
{
// Test reply swap state.
@@ -73,7 +74,7 @@ Test::Main()
msg.discard();
Reply::UP reply = handler.getReply(0);
- ASSERT_TRUE(reply.get() == NULL);
+ ASSERT_FALSE(reply);
}
{
// Test reply discard logic.
@@ -86,7 +87,7 @@ Test::Main()
reply.discard();
Reply::UP ap = handler.getReply(0);
- ASSERT_TRUE(ap.get() == NULL);
+ ASSERT_FALSE(ap);
}
TEST_DONE();
diff --git a/messagebus/src/tests/timeout/timeout.cpp b/messagebus/src/tests/timeout/timeout.cpp
index 03a2c61a052..b2631e13d9c 100644
--- a/messagebus/src/tests/timeout/timeout.cpp
+++ b/messagebus/src/tests/timeout/timeout.cpp
@@ -13,6 +13,8 @@
#include <vespa/messagebus/testlib/testserver.h>
using namespace mbus;
+using namespace std::chrono_literals;
+
class Test : public vespalib::TestApp {
public:
@@ -42,7 +44,7 @@ Test::testZeroTimeout()
TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok);
Receptor srcHandler;
- SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0));
+ SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0s));
Receptor dstHandler;
DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler);
@@ -50,7 +52,7 @@ Test::testZeroTimeout()
ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted());
Reply::UP reply = srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());
}
@@ -63,19 +65,19 @@ Test::testMessageExpires()
TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok);
Receptor srcHandler, dstHandler;
- SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1));
+ SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1s));
DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler);
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted());
Reply::UP reply = srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());
Message::UP msg = dstHandler.getMessage(1);
- if (msg.get() != NULL) {
+ if (msg) {
msg->discard();
}
}
diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h
index 0e171b9f6d0..c8de001bd97 100644
--- a/messagebus/src/vespa/messagebus/common.h
+++ b/messagebus/src/vespa/messagebus/common.h
@@ -2,11 +2,15 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
+#include <chrono>
namespace mbus {
// Decide the type of string used once
-typedef vespalib::string string;
+using string = vespalib::string;
+
+using seconds = std::chrono::duration<double>;
+
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp
index 94ef718c404..af1d97334f6 100644
--- a/messagebus/src/vespa/messagebus/message.cpp
+++ b/messagebus/src/vespa/messagebus/message.cpp
@@ -62,15 +62,15 @@ Message::setTimeReceivedNow()
return *this;
}
-uint64_t
+steady_clock::time_point
Message::getTimeReceived() const {
- return duration_cast<milliseconds>(_timeReceived.time_since_epoch()).count();
+ return _timeReceived;
}
-uint64_t
+milliseconds
Message::getTimeRemainingNow() const
{
- return std::max(0L, _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived).count());
+ return std::max(milliseconds(0), _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived));
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h
index 4d4c83b06bd..dce4d86450a 100644
--- a/messagebus/src/vespa/messagebus/message.h
+++ b/messagebus/src/vespa/messagebus/message.h
@@ -3,7 +3,6 @@
#include "routable.h"
#include <vespa/messagebus/routing/route.h>
-#include <memory>
#include <chrono>
namespace mbus {
@@ -12,23 +11,11 @@ namespace mbus {
* A Message is a question, a Reply is the answer.
*/
class Message : public Routable {
-private:
- using time_point = std::chrono::steady_clock::time_point;
- Route _route;
- time_point _timeReceived;
- int64_t _timeRemaining;
- bool _retryEnabled;
- uint32_t _retry;
-
public:
- /**
- * Convenience typedef for an auto pointer to a Message object.
- */
- typedef std::unique_ptr<Message> UP;
+ using time_point = std::chrono::steady_clock::time_point;
+ using milliseconds = std::chrono::milliseconds;
+ using UP = std::unique_ptr<Message>;
- /**
- * Constructs a new instance of this class.
- */
Message();
Message(const Message &) = delete;
Message(Message &&) = delete;
@@ -51,7 +38,7 @@ public:
*
* @return The timestamp this was last seen.
*/
- uint64_t getTimeReceived() const;
+ time_point getTimeReceived() const;
/**
* This is a convenience method to call {@link #setTimeReceived(uint64_t)}
@@ -69,7 +56,7 @@ public:
*
* @return The remaining time in milliseconds.
*/
- uint64_t getTimeRemaining() const { return _timeRemaining; }
+ milliseconds getTimeRemaining() const { return _timeRemaining; }
/**
* Sets the numer of milliseconds that remain before this message times
@@ -79,7 +66,7 @@ public:
* @param timeRemaining The number of milliseconds until expiration.
* @return This, to allow chaining.
*/
- Message &setTimeRemaining(uint64_t timeRemaining) { _timeRemaining = timeRemaining; return *this; }
+ Message &setTimeRemaining(milliseconds timeRemaining) { _timeRemaining = timeRemaining; return *this; }
/**
* Returns the number of milliseconds that remain right now before this
@@ -93,7 +80,7 @@ public:
*
* @return The remaining time in milliseconds.
*/
- uint64_t getTimeRemainingNow() const;
+ milliseconds getTimeRemainingNow() const;
/**
* Access the route associated with this message.
@@ -196,6 +183,12 @@ public:
* @return This, to allow chaining.
*/
Message &setRetry(uint32_t retry) { _retry = retry; return *this; }
+private:
+ Route _route;
+ time_point _timeReceived;
+ milliseconds _timeRemaining;
+ bool _retryEnabled;
+ uint32_t _retry;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index fd1ad2908c7..d277063a273 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -15,6 +15,7 @@ LOG_SETUP(".messagebus");
using vespalib::LockGuard;
using vespalib::make_string;
+using namespace std::chrono_literals;
namespace {
@@ -64,7 +65,7 @@ public:
_gate(gate)
{ }
- ~ShutdownTask() {
+ ~ShutdownTask() override {
_gate.countDown();
}
@@ -150,7 +151,7 @@ MessageBus::setup(const MessageBusParams &params)
if (!_network.start()) {
throw vespalib::NetworkSetupFailureException("Failed to start network.");
}
- if (!_network.waitUntilReady(120)) {
+ if (!_network.waitUntilReady(120s)) {
throw vespalib::NetworkSetupFailureException("Network failed to become ready in time.");
}
@@ -396,7 +397,7 @@ MessageBus::deliverReply(Reply::UP reply, IReplyHandler &handler)
_msn->deliverReply(std::move(reply), handler);
}
-const string
+string
MessageBus::getConnectionSpec() const
{
return _network.getConnectionSpec();
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index d00444b563a..f10898275b3 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -274,7 +274,7 @@ public:
*
* @return The connection string.
*/
- const string getConnectionSpec() const;
+ string getConnectionSpec() const;
/**
* Provide access to the underlying {@link Messenger} object.
diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h
index a22e9fe9e11..037298cf7c0 100644
--- a/messagebus/src/vespa/messagebus/network/inetwork.h
+++ b/messagebus/src/vespa/messagebus/network/inetwork.h
@@ -62,7 +62,7 @@ public:
* @param seconds The timeout.
* @return True if ready.
*/
- virtual bool waitUntilReady(double seconds) const = 0;
+ virtual bool waitUntilReady(seconds timeout) const = 0;
/**
* Register a session name with the network layer. This will make the
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index e214a435a4e..19e33cb02fd 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -227,12 +227,12 @@ RPCNetwork::start()
}
bool
-RPCNetwork::waitUntilReady(double seconds) const
+RPCNetwork::waitUntilReady(seconds timeout) const
{
slobrok::api::SlobrokList brokerList;
slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList);
bool hasConfig = false;
- for (uint32_t i = 0; i < seconds * 100; ++i) {
+ for (uint32_t i = 0; i < timeout.count() * 100; ++i) {
if (configurator->poll()) {
hasConfig = true;
}
@@ -242,10 +242,10 @@ RPCNetwork::waitUntilReady(double seconds) const
std::this_thread::sleep_for(10ms);
}
if (! hasConfig) {
- LOG(error, "failed to get config for slobroks in %d seconds", (int)seconds);
+ LOG(error, "failed to get config for slobroks in %2.2f seconds", timeout.count());
} else if (! _mirror->ready()) {
auto brokers = brokerList.logString();
- LOG(error, "mirror (of %s) failed to become ready in %d seconds", brokers.c_str(), (int)seconds);
+ LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), timeout.count());
}
return false;
}
@@ -322,7 +322,7 @@ void
RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients)
{
SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self
- double timeout = ctx._msg.getTimeRemainingNow() / 1000.0;
+ double timeout = ctx._msg.getTimeRemainingNow().count() / 1000.0;
for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) {
RoutingNode *&recipient = ctx._recipients[i];
@@ -373,13 +373,13 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx)
make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.",
buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str()));
} else {
- uint64_t timeRemaining = ctx._msg.getTimeRemainingNow();
+ std::chrono::milliseconds timeRemaining = ctx._msg.getTimeRemainingNow();
Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg);
RPCSendAdapter *adapter = getSendAdapter(ctx._version);
if (adapter == nullptr) {
replyError(ctx, ErrorCode::INCOMPATIBLE_VERSION,
make_string("Can not send to version '%s' recipient.", ctx._version.toString().c_str()));
- } else if (timeRemaining == 0) {
+ } else if (timeRemaining == 0ms) {
replyError(ctx, ErrorCode::TIMEOUT, "Aborting transmission because zero time remains.");
} else if (payload.size() == 0) {
replyError(ctx, ErrorCode::ENCODE_ERROR,
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 123987fea68..169bdd86dd9 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -219,7 +219,7 @@ public:
void attach(INetworkOwner &owner) override;
const string getConnectionSpec() const override;
bool start() override;
- bool waitUntilReady(double seconds) const override;
+ bool waitUntilReady(seconds timout) const override;
void registerSession(const string &session) override;
void unregisterSession(const string &session) override;
bool allocServiceAddress(RoutingNode &recipient) override;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 87c87173ec7..bed2847f3c6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -98,20 +98,20 @@ RPCSend::handleDiscard(Context ctx)
}
void
-RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining)
+RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, milliseconds timeRemaining)
{
send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
}
void
-RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining)
+RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, milliseconds timeRemaining)
{
send(recipient, version, FillByCopy(payload), timeRemaining);
}
void
RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & payload, uint64_t timeRemaining)
+ const PayLoadFiller & payload, milliseconds timeRemaining)
{
SendContext::UP ctx(new SendContext(recipient, timeRemaining));
RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index 051abff6d1b..e207f7c1812 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -38,13 +38,14 @@ class RPCSend : public RPCSendAdapter,
public:
class Params {
public:
+ using milliseconds = std::chrono::milliseconds;
virtual ~Params() {}
virtual vespalib::Version getVersion() const = 0;
virtual vespalib::stringref getProtocol() const = 0;
virtual uint32_t getTraceLevel() const = 0;
virtual bool useRetry() const = 0;
virtual uint32_t getRetries() const = 0;
- virtual uint64_t getRemainingTime() const = 0;
+ virtual milliseconds getRemainingTime() const = 0;
virtual vespalib::stringref getRoute() const = 0;
virtual vespalib::stringref getSession() const = 0;
virtual BlobRef getPayload() const = 0;
@@ -59,13 +60,13 @@ protected:
Error & error, vespalib::TraceNode & rootTrace) const = 0;
virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const = 0;
+ const PayLoadFiller &filler, milliseconds timeRemaining) const = 0;
virtual const char * getReturnSpec() const = 0;
virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0;
virtual std::unique_ptr<Params> toParams(const FRT_Values &param) const = 0;
void send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & filler, uint64_t timeRemaining);
+ const PayLoadFiller & filler, milliseconds timeRemaining);
std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version,
BlobRef payload, Error & error) const;
/**
@@ -89,9 +90,9 @@ private:
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining) final override;
+ Blob payload, milliseconds timeRemaining) final override;
void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining) final override;
+ BlobRef payload, milliseconds timeRemaining) final override;
void RequestDone(FRT_RPCRequest *req) final override;
void handleReply(std::unique_ptr<Reply> reply) final override;
};
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
index f5867e79856..6462bb0691e 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
@@ -16,13 +16,15 @@ private:
double _timeout;
public:
- typedef std::unique_ptr<SendContext> UP;
+ using milliseconds = std::chrono::milliseconds;
+ using UP = std::unique_ptr<SendContext>;
SendContext(const SendContext &) = delete;
SendContext & operator = (const SendContext &) = delete;
- SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
- : _recipient(recipient),
- _trace(recipient.getTrace().getLevel()),
- _timeout(timeRemaining * 0.001) { }
+ SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining)
+ : _recipient(recipient),
+ _trace(recipient.getTrace().getLevel()),
+ _timeout(timeRemaining.count() * 0.001)
+ { }
mbus::RoutingNode &getRecipient() { return _recipient; }
mbus::Trace &getTrace() { return _trace; }
double getTimeout() { return _timeout; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
index d06f3531ca5..9b841811bb2 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
@@ -3,6 +3,7 @@
#include <vespa/messagebus/blobref.h>
#include <vespa/vespalib/component/version.h>
+#include <chrono>
namespace mbus {
@@ -19,6 +20,7 @@ class RPCSendAdapter
protected:
RPCSendAdapter() = default;
public:
+ using milliseconds = std::chrono::milliseconds;
RPCSendAdapter(const RPCSendAdapter &) = delete;
RPCSendAdapter & operator = (const RPCSendAdapter &) = delete;
/**
@@ -42,7 +44,7 @@ public:
* @param timeRemaining The time remaining until the message expires.
*/
virtual void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining) = 0;
+ BlobRef payload, milliseconds timeRemaining) = 0;
/**
* Performs the actual sending to the given recipient.
@@ -53,7 +55,7 @@ public:
* @param timeRemaining The time remaining until the message expires.
*/
virtual void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining) = 0;
+ Blob payload, milliseconds timeRemaining) = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
index 376267b555c..e902aa20965 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
@@ -59,7 +59,7 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder)
void
RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const
+ const PayLoadFiller &filler, milliseconds timeRemaining) const
{
FRT_Values &args = *req.GetParams();
@@ -69,7 +69,7 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version,
args.AddString(address.getSessionName().c_str());
args.AddInt8(msg.getRetryEnabled() ? 1 : 0);
args.AddInt32(msg.getRetry());
- args.AddInt64(timeRemaining);
+ args.AddInt64(timeRemaining.count());
args.AddString(msg.getProtocol().c_str());
filler.fill(args);
args.AddInt32(traceLevel);
@@ -85,7 +85,7 @@ public:
uint32_t getTraceLevel() const override { return _args[8]._intval32; }
bool useRetry() const override { return _args[3]._intval8 != 0; }
uint32_t getRetries() const override { return _args[4]._intval32; }
- uint64_t getRemainingTime() const override { return _args[5]._intval64; }
+ milliseconds getRemainingTime() const override { return milliseconds(_args[5]._intval64); }
vespalib::Version getVersion() const override {
return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len));
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
index 37f23335309..3265c304830 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
@@ -14,7 +14,7 @@ private:
std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const override;
+ const PayLoadFiller &filler, milliseconds timeRemaining) const override;
std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
Error & error, vespalib::TraceNode & rootTrace) const override;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
index 91a41a6a800..3e453bb60eb 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
@@ -101,7 +101,7 @@ private:
void
RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const
+ const PayLoadFiller &filler, milliseconds timeRemaining) const
{
FRT_Values &args = *req.GetParams();
req.SetMethodName(METHOD_NAME);
@@ -118,7 +118,7 @@ RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Rout
root.setString(SESSION_F, address.getSessionName());
root.setBool(USERETRY_F, msg.getRetryEnabled());
root.setLong(RETRY_F, msg.getRetry());
- root.setLong(TIMELEFT_F, timeRemaining);
+ root.setLong(TIMELEFT_F, timeRemaining.count());
root.setString(PROTOCOL_F, msg.getProtocol());
root.setLong(TRACELEVEL_F, traceLevel);
filler.fill(BLOB_F, root);
@@ -156,7 +156,7 @@ public:
uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); }
bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); }
uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); }
- uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); }
+ milliseconds getRemainingTime() const override { return milliseconds(_slime.get()[TIMELEFT_F].asLong()); }
Version getVersion() const override {
return Version(_slime.get()[VERSION_F].asString().make_stringref());
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
index e793868d2aa..939c37c81b2 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
@@ -14,7 +14,7 @@ private:
std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const override;
+ const PayLoadFiller &filler, milliseconds timeRemaining) const override;
std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
Error & error, vespalib::TraceNode & rootTrace) const override;
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index 5385ebd8844..26c30d92375 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -74,7 +74,8 @@ Resender::scheduleRetry(RoutingNode &node)
if (delay < 0) {
delay = _retryPolicy->getRetryDelay(retry);
}
- if (msg.getTimeRemainingNow() * 0.001 - delay <= 0) {
+ milliseconds delayMS(static_cast<long>(delay * 1000));
+ if (msg.getTimeRemainingNow() <= delayMS) {
node.addError(ErrorCode::TIMEOUT, "Timeout exceeded by resender, giving up.");
return false;
}
@@ -83,7 +84,7 @@ Resender::scheduleRetry(RoutingNode &node)
TraceLevel::COMPONENT,
vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay));
msg.setRetry(retry);
- _queue.push(Entry(steady_clock::now() + milliseconds(static_cast<long>(delay * 1000)), &node));
+ _queue.push(Entry(steady_clock::now() + delayMS, &node));
return true;
}
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index 1f6758b92c3..41fe01625ae 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -9,6 +9,9 @@
using vespalib::make_string;
+using namespace std::chrono_literals;
+using namespace std::chrono;
+
namespace mbus {
SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams &params)
@@ -71,8 +74,8 @@ Result
SourceSession::send(Message::UP msg)
{
msg->setTimeReceivedNow();
- if (msg->getTimeRemaining() == 0) {
- msg->setTimeRemaining((uint64_t)(_timeout * 1000));
+ if (msg->getTimeRemaining() == 0ms) {
+ msg->setTimeRemaining(duration_cast<milliseconds>(_timeout));
}
{
vespalib::MonitorGuard guard(_monitor);
@@ -145,7 +148,7 @@ SourceSession &
SourceSession::setTimeout(double timeout)
{
vespalib::MonitorGuard guard(_monitor);
- _timeout = timeout;
+ _timeout = seconds(timeout);
return *this;
}
diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h
index 2ce8eea384c..31ebec3555e 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.h
+++ b/messagebus/src/vespa/messagebus/sourcesession.h
@@ -27,7 +27,7 @@ private:
Sequencer _sequencer;
IReplyHandler &_replyHandler;
IThrottlePolicy::SP _throttlePolicy;
- double _timeout;
+ seconds _timeout;
uint32_t _pendingCount;
bool _closed;
bool _done;
@@ -53,7 +53,7 @@ public:
* messages. After this method returns, messagebus will not invoke any handlers associated with this
* session.
**/
- virtual ~SourceSession();
+ ~SourceSession() override;
/**
* This is a convenience function to assign a named route to the given message, and then pass it to the
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
index 51fe91562ae..e7a99f2a1de 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
@@ -24,14 +24,14 @@ SourceSessionParams::setThrottlePolicy(IThrottlePolicy::SP throttlePolicy)
return *this;
}
-double
+seconds
SourceSessionParams::getTimeout() const
{
return _timeout;
}
SourceSessionParams &
-SourceSessionParams::setTimeout(double timeout)
+SourceSessionParams::setTimeout(seconds timeout)
{
_timeout = timeout;
return *this;
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h
index 7c14ee4524d..bc4cad48ba5 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.h
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h
@@ -3,10 +3,11 @@
#include "ireplyhandler.h"
#include "ithrottlepolicy.h"
+#include <chrono>
namespace mbus {
-/**
+ /**
* To facilitate several configuration parameters to the {@link MessageBus#createSourceSession(ReplyHandler,
* SourceSessionParams)}, all parameters are held by this class. This class has reasonable default values for each
* parameter.
@@ -18,7 +19,7 @@ class SourceSessionParams {
private:
IReplyHandler *_replyHandler;
IThrottlePolicy::SP _throttlePolicy;
- double _timeout;
+ seconds _timeout;
public:
/**
@@ -46,14 +47,14 @@ public:
*
* @return The total timeout parameter.
*/
- double getTimeout() const;
+ seconds getTimeout() const;
/**
* Returns the number of seconds a message can spend trying to succeed.
*
* @return The timeout in seconds.
*/
- SourceSessionParams &setTimeout(double timeout);
+ SourceSessionParams &setTimeout(seconds timeout);
/**
* Returns whether or not a reply handler has been assigned to this.