summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-11-18 22:44:04 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-11-20 10:16:58 +0000
commit55069682323b0c204b2ddda696bfcd33f4e47a64 (patch)
tree508042cdb977ec9ea3c6323e8727aa149e09ba54 /messagebus
parent95432b0ec4be8e844fe5433598a9045d0de08fef (diff)
Use C++11 chrono instead prehistoric homegrown stuff.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/message.cpp17
-rw-r--r--messagebus/src/vespa/messagebus/message.h42
-rw-r--r--messagebus/src/vespa/messagebus/reply.h2
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp22
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.h8
-rw-r--r--messagebus/src/vespa/messagebus/systemtimer.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.cpp27
8 files changed, 45 insertions, 89 deletions
diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp
index c6d1bcce36f..94ef718c404 100644
--- a/messagebus/src/vespa/messagebus/message.cpp
+++ b/messagebus/src/vespa/messagebus/message.cpp
@@ -10,6 +10,7 @@
#include <vespa/log/log.h>
LOG_SETUP(".message");
+using namespace std::chrono;
namespace mbus {
Message::Message() :
@@ -29,7 +30,7 @@ Message::~Message()
string backtrace = vespalib::getStackTrace(0);
LOG(warning, "Deleted message %p with non-empty call-stack. Deleted at:\n%s",
this, backtrace.c_str());
- Reply::UP reply(new EmptyReply());
+ auto reply = std::make_unique<EmptyReply>();
swapState(*reply);
reply->addError(Error(ErrorCode::TRANSIENT_ERROR,
"The message object was deleted while containing state information; "
@@ -55,23 +56,21 @@ Message::swapState(Routable &rhs)
}
Message &
-Message::setTimeReceived(uint64_t timeReceived)
+Message::setTimeReceivedNow()
{
- _timeReceived.SetMilliSecs(timeReceived);
+ _timeReceived = steady_clock::now();
return *this;
}
-Message &
-Message::setTimeReceivedNow()
-{
- _timeReceived.SetNow();
- return *this;
+uint64_t
+Message::getTimeReceived() const {
+ return duration_cast<milliseconds>(_timeReceived.time_since_epoch()).count();
}
uint64_t
Message::getTimeRemainingNow() const
{
- return (uint64_t)std::max(0.0, _timeRemaining - _timeReceived.MilliSecsToNow());
+ return std::max(0L, _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived).count());
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h
index 539720374d0..4d4c83b06bd 100644
--- a/messagebus/src/vespa/messagebus/message.h
+++ b/messagebus/src/vespa/messagebus/message.h
@@ -3,8 +3,8 @@
#include "routable.h"
#include <vespa/messagebus/routing/route.h>
-#include <vespa/fastos/time.h>
#include <memory>
+#include <chrono>
namespace mbus {
@@ -13,9 +13,10 @@ namespace mbus {
*/
class Message : public Routable {
private:
+ using time_point = std::chrono::steady_clock::time_point;
Route _route;
- FastOS_Time _timeReceived;
- uint64_t _timeRemaining;
+ time_point _timeReceived;
+ int64_t _timeRemaining;
bool _retryEnabled;
uint32_t _retry;
@@ -39,7 +40,7 @@ public:
* will log an error and generate an auto-reply to avoid having the sender
* wait indefinetly for a reply.
*/
- ~Message();
+ ~Message() override;
void swapState(Routable &rhs) override;
@@ -50,20 +51,7 @@ public:
*
* @return The timestamp this was last seen.
*/
- uint64_t getTimeReceived() const { return (uint64_t)_timeReceived.MilliSecs(); }
-
- /**
- * Sets the timestamp for when this message was last seen by message bus to
- * the given time in milliseconds since epoch. Please see comment on {@link
- * #isExpired()} for more information on how to determine whether or not a
- * message has expired. You should never need to call this method yourself,
- * as it is touched automatically whenever message bus encounters a new
- * message.
- *
- * @param timeReceived The time received in milliseconds.
- * @return This, to allow chaining.
- */
- Message &setTimeReceived(uint64_t timeReceived);
+ uint64_t getTimeReceived() const;
/**
* This is a convenience method to call {@link #setTimeReceived(uint64_t)}
@@ -108,14 +96,6 @@ public:
uint64_t getTimeRemainingNow() const;
/**
- * Returns whether or not this message has expired.
- *
- * @return True if {@link this#getTimeRemainingNow()} is less than or equal
- * to zero.
- */
- bool isExpired() { return getTimeRemainingNow() == 0; }
-
- /**
* Access the route associated with this message.
*
* @return reference to internal route object
@@ -173,16 +153,6 @@ public:
virtual bool hasBucketSequence() { return false; }
/**
- * Returns the identifier used to order message buckets. Any two messages
- * that have the same bucket sequence are ensured to arrive at the NEXT peer
- * in the order they were sent by THIS peer. This value is only respected if
- * the {@link #hasBucketSequence()} method returns true.
- *
- * @return The bucket sequence.
- */
- virtual uint64_t getBucketSequence() { return 0; }
-
- /**
* Obtain the approximate size of this message object in bytes. This enables
* messagebus to track the size of the send queue in both memory usage and
* item count. This method returns 1 by default, and must be overridden to
diff --git a/messagebus/src/vespa/messagebus/reply.h b/messagebus/src/vespa/messagebus/reply.h
index a7040c0cbb9..64b9f5c0b13 100644
--- a/messagebus/src/vespa/messagebus/reply.h
+++ b/messagebus/src/vespa/messagebus/reply.h
@@ -40,7 +40,7 @@ public:
* will log an error and generate an auto-reply to avoid having the sender
* wait indefinetly for a reply.
*/
- ~Reply();
+ ~Reply() override;
void swapState(Routable &rhs) override;
bool isReply() const override;
diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp
index a3ba2ffadd3..eb2d93c6688 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.cpp
+++ b/messagebus/src/vespa/messagebus/routablequeue.cpp
@@ -2,6 +2,8 @@
#include "routablequeue.h"
+using namespace std::chrono;
+
namespace mbus {
RoutableQueue::RoutableQueue()
@@ -39,15 +41,14 @@ RoutableQueue::enqueue(Routable::UP r)
Routable::UP
RoutableQueue::dequeue(uint32_t msTimeout)
{
- FastOS_Time t;
- t.SetNow();
- uint32_t msLeft = msTimeout;
+ steady_clock::time_point startTime = steady_clock::now();
+ uint64_t msLeft = msTimeout;
vespalib::MonitorGuard guard(_monitor);
while (_queue.size() == 0 && msLeft > 0) {
if (!guard.wait(msLeft) || _queue.size() > 0) {
break;
}
- uint32_t elapsed = (uint32_t)t.MilliSecsToNow();
+ uint64_t elapsed = duration_cast<milliseconds>(steady_clock::now() - startTime).count();
msLeft = (elapsed > msTimeout) ? 0 : msTimeout - elapsed;
}
if (_queue.size() == 0) {
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index 0ba6e0827b2..5385ebd8844 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -6,15 +6,14 @@
#include <vespa/messagebus/tracelevel.h>
#include <vespa/vespalib/util/stringfmt.h>
+using namespace std::chrono;
+
namespace mbus {
Resender::Resender(IRetryPolicy::SP retryPolicy) :
_queue(),
- _retryPolicy(retryPolicy),
- _time()
-{
- _time.SetNow();
-}
+ _retryPolicy(retryPolicy)
+{ }
Resender::~Resender()
{
@@ -30,18 +29,15 @@ Resender::resendScheduled()
typedef std::vector<RoutingNode*> NodeList;
NodeList sendList;
- double now = _time.MilliSecsToNow();
+ time_point now = steady_clock::now();
while (!_queue.empty() && _queue.top().first <= now) {
sendList.push_back(_queue.top().second);
_queue.pop();
}
- for (NodeList::iterator it = sendList.begin();
- it != sendList.end(); ++it)
- {
- (*it)->getTrace().trace(mbus::TraceLevel::COMPONENT,
- "Resender resending message.");
- (*it)->send();
+ for (RoutingNode *node : sendList) {
+ node->getTrace().trace(mbus::TraceLevel::COMPONENT, "Resender resending message.");
+ node->send();
}
}
@@ -87,7 +83,7 @@ Resender::scheduleRetry(RoutingNode &node)
TraceLevel::COMPONENT,
vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay));
msg.setRetry(retry);
- _queue.push(Entry((uint64_t)(_time.MilliSecsToNow() + delay * 1000), &node));
+ _queue.push(Entry(steady_clock::now() + milliseconds(static_cast<long>(delay * 1000)), &node));
return true;
}
diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h
index 93752dcfd5c..68b49cde606 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.h
+++ b/messagebus/src/vespa/messagebus/routing/resender.h
@@ -5,7 +5,6 @@
#include <vespa/messagebus/queue.h>
#include <vespa/messagebus/reply.h>
#include <vespa/vespalib/util/sync.h>
-#include <vespa/fastos/time.h>
#include <queue>
#include <vector>
@@ -23,18 +22,17 @@ class RoutingNode;
class Resender
{
private:
- typedef std::pair<uint64_t, RoutingNode*> Entry;
+ using time_point = std::chrono::steady_clock::time_point;
+ typedef std::pair<time_point , RoutingNode*> Entry;
struct Cmp {
bool operator()(const Entry &a, const Entry &b) {
return (b.first < a.first);
}
};
- typedef std::priority_queue<Entry, std::vector<Entry>, Cmp> PriorityQueue;
+ using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>;
PriorityQueue _queue;
IRetryPolicy::SP _retryPolicy;
- FastOS_Time _time;
-
public:
/**
* Convenience typedefs.
diff --git a/messagebus/src/vespa/messagebus/systemtimer.cpp b/messagebus/src/vespa/messagebus/systemtimer.cpp
index 6c787509fbf..0570d81ce68 100644
--- a/messagebus/src/vespa/messagebus/systemtimer.cpp
+++ b/messagebus/src/vespa/messagebus/systemtimer.cpp
@@ -1,15 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "systemtimer.h"
-#include <vespa/fastos/time.h>
+#include <chrono>
+using namespace std::chrono;
namespace mbus {
uint64_t
SystemTimer::getMilliTime() const
{
- FastOS_Time time;
- time.SetNow();
- return (uint64_t)time.MilliSecs();
+ return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
index f821021a482..f98a4be05c3 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
@@ -2,17 +2,12 @@
#include "receptor.h"
-namespace mbus {
+using namespace std::chrono;
-Receptor::Receptor()
- : IMessageHandler(),
- IReplyHandler(),
- _mon(),
- _msg(),
- _reply()
-{ }
+namespace mbus {
-Receptor::~Receptor() {}
+Receptor::Receptor() = default;
+Receptor::~Receptor() = default;
void
Receptor::handleMessage(Message::UP msg)
@@ -33,12 +28,11 @@ Receptor::handleReply(Reply::UP reply)
Message::UP
Receptor::getMessage(double maxWait)
{
- int ms = (int)(maxWait * 1000);
- FastOS_Time startTime;
- startTime.SetNow();
+ int64_t ms = (int64_t)(maxWait * 1000);
+ steady_clock::time_point startTime = steady_clock::now();
vespalib::MonitorGuard guard(_mon);
while (_msg.get() == 0) {
- int w = ms - (int)startTime.MilliSecsToNow();
+ int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count();
if (w <= 0 || !guard.wait(w)) {
break;
}
@@ -49,12 +43,11 @@ Receptor::getMessage(double maxWait)
Reply::UP
Receptor::getReply(double maxWait)
{
- int ms = (int)(maxWait * 1000);
- FastOS_Time startTime;
- startTime.SetNow();
+ int64_t ms = (int)(maxWait * 1000);
+ steady_clock::time_point startTime = steady_clock::now();
vespalib::MonitorGuard guard(_mon);
while (_reply.get() == 0) {
- int w = ms - (int)startTime.MilliSecsToNow();
+ int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count();
if (w <= 0 || !guard.wait(w)) {
break;
}