diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-18 22:44:04 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-20 10:16:58 +0000 |
commit | 55069682323b0c204b2ddda696bfcd33f4e47a64 (patch) | |
tree | 508042cdb977ec9ea3c6323e8727aa149e09ba54 /messagebus | |
parent | 95432b0ec4be8e844fe5433598a9045d0de08fef (diff) |
Use C++11 chrono instead prehistoric homegrown stuff.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/vespa/messagebus/message.cpp | 17 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/message.h | 42 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/reply.h | 2 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/routablequeue.cpp | 9 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/routing/resender.cpp | 22 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/routing/resender.h | 8 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/systemtimer.cpp | 7 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/testlib/receptor.cpp | 27 |
8 files changed, 45 insertions, 89 deletions
diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp index c6d1bcce36f..94ef718c404 100644 --- a/messagebus/src/vespa/messagebus/message.cpp +++ b/messagebus/src/vespa/messagebus/message.cpp @@ -10,6 +10,7 @@ #include <vespa/log/log.h> LOG_SETUP(".message"); +using namespace std::chrono; namespace mbus { Message::Message() : @@ -29,7 +30,7 @@ Message::~Message() string backtrace = vespalib::getStackTrace(0); LOG(warning, "Deleted message %p with non-empty call-stack. Deleted at:\n%s", this, backtrace.c_str()); - Reply::UP reply(new EmptyReply()); + auto reply = std::make_unique<EmptyReply>(); swapState(*reply); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "The message object was deleted while containing state information; " @@ -55,23 +56,21 @@ Message::swapState(Routable &rhs) } Message & -Message::setTimeReceived(uint64_t timeReceived) +Message::setTimeReceivedNow() { - _timeReceived.SetMilliSecs(timeReceived); + _timeReceived = steady_clock::now(); return *this; } -Message & -Message::setTimeReceivedNow() -{ - _timeReceived.SetNow(); - return *this; +uint64_t +Message::getTimeReceived() const { + return duration_cast<milliseconds>(_timeReceived.time_since_epoch()).count(); } uint64_t Message::getTimeRemainingNow() const { - return (uint64_t)std::max(0.0, _timeRemaining - _timeReceived.MilliSecsToNow()); + return std::max(0L, _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived).count()); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index 539720374d0..4d4c83b06bd 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -3,8 +3,8 @@ #include "routable.h" #include <vespa/messagebus/routing/route.h> -#include <vespa/fastos/time.h> #include <memory> +#include <chrono> namespace mbus { @@ -13,9 +13,10 @@ namespace mbus { */ class Message : public Routable { private: + using time_point = std::chrono::steady_clock::time_point; Route _route; - FastOS_Time _timeReceived; - uint64_t _timeRemaining; + time_point _timeReceived; + int64_t _timeRemaining; bool _retryEnabled; uint32_t _retry; @@ -39,7 +40,7 @@ public: * will log an error and generate an auto-reply to avoid having the sender * wait indefinetly for a reply. */ - ~Message(); + ~Message() override; void swapState(Routable &rhs) override; @@ -50,20 +51,7 @@ public: * * @return The timestamp this was last seen. */ - uint64_t getTimeReceived() const { return (uint64_t)_timeReceived.MilliSecs(); } - - /** - * Sets the timestamp for when this message was last seen by message bus to - * the given time in milliseconds since epoch. Please see comment on {@link - * #isExpired()} for more information on how to determine whether or not a - * message has expired. You should never need to call this method yourself, - * as it is touched automatically whenever message bus encounters a new - * message. - * - * @param timeReceived The time received in milliseconds. - * @return This, to allow chaining. - */ - Message &setTimeReceived(uint64_t timeReceived); + uint64_t getTimeReceived() const; /** * This is a convenience method to call {@link #setTimeReceived(uint64_t)} @@ -108,14 +96,6 @@ public: uint64_t getTimeRemainingNow() const; /** - * Returns whether or not this message has expired. - * - * @return True if {@link this#getTimeRemainingNow()} is less than or equal - * to zero. - */ - bool isExpired() { return getTimeRemainingNow() == 0; } - - /** * Access the route associated with this message. * * @return reference to internal route object @@ -173,16 +153,6 @@ public: virtual bool hasBucketSequence() { return false; } /** - * Returns the identifier used to order message buckets. Any two messages - * that have the same bucket sequence are ensured to arrive at the NEXT peer - * in the order they were sent by THIS peer. This value is only respected if - * the {@link #hasBucketSequence()} method returns true. - * - * @return The bucket sequence. - */ - virtual uint64_t getBucketSequence() { return 0; } - - /** * Obtain the approximate size of this message object in bytes. This enables * messagebus to track the size of the send queue in both memory usage and * item count. This method returns 1 by default, and must be overridden to diff --git a/messagebus/src/vespa/messagebus/reply.h b/messagebus/src/vespa/messagebus/reply.h index a7040c0cbb9..64b9f5c0b13 100644 --- a/messagebus/src/vespa/messagebus/reply.h +++ b/messagebus/src/vespa/messagebus/reply.h @@ -40,7 +40,7 @@ public: * will log an error and generate an auto-reply to avoid having the sender * wait indefinetly for a reply. */ - ~Reply(); + ~Reply() override; void swapState(Routable &rhs) override; bool isReply() const override; diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp index a3ba2ffadd3..eb2d93c6688 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.cpp +++ b/messagebus/src/vespa/messagebus/routablequeue.cpp @@ -2,6 +2,8 @@ #include "routablequeue.h" +using namespace std::chrono; + namespace mbus { RoutableQueue::RoutableQueue() @@ -39,15 +41,14 @@ RoutableQueue::enqueue(Routable::UP r) Routable::UP RoutableQueue::dequeue(uint32_t msTimeout) { - FastOS_Time t; - t.SetNow(); - uint32_t msLeft = msTimeout; + steady_clock::time_point startTime = steady_clock::now(); + uint64_t msLeft = msTimeout; vespalib::MonitorGuard guard(_monitor); while (_queue.size() == 0 && msLeft > 0) { if (!guard.wait(msLeft) || _queue.size() > 0) { break; } - uint32_t elapsed = (uint32_t)t.MilliSecsToNow(); + uint64_t elapsed = duration_cast<milliseconds>(steady_clock::now() - startTime).count(); msLeft = (elapsed > msTimeout) ? 0 : msTimeout - elapsed; } if (_queue.size() == 0) { diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index 0ba6e0827b2..5385ebd8844 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -6,15 +6,14 @@ #include <vespa/messagebus/tracelevel.h> #include <vespa/vespalib/util/stringfmt.h> +using namespace std::chrono; + namespace mbus { Resender::Resender(IRetryPolicy::SP retryPolicy) : _queue(), - _retryPolicy(retryPolicy), - _time() -{ - _time.SetNow(); -} + _retryPolicy(retryPolicy) +{ } Resender::~Resender() { @@ -30,18 +29,15 @@ Resender::resendScheduled() typedef std::vector<RoutingNode*> NodeList; NodeList sendList; - double now = _time.MilliSecsToNow(); + time_point now = steady_clock::now(); while (!_queue.empty() && _queue.top().first <= now) { sendList.push_back(_queue.top().second); _queue.pop(); } - for (NodeList::iterator it = sendList.begin(); - it != sendList.end(); ++it) - { - (*it)->getTrace().trace(mbus::TraceLevel::COMPONENT, - "Resender resending message."); - (*it)->send(); + for (RoutingNode *node : sendList) { + node->getTrace().trace(mbus::TraceLevel::COMPONENT, "Resender resending message."); + node->send(); } } @@ -87,7 +83,7 @@ Resender::scheduleRetry(RoutingNode &node) TraceLevel::COMPONENT, vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay)); msg.setRetry(retry); - _queue.push(Entry((uint64_t)(_time.MilliSecsToNow() + delay * 1000), &node)); + _queue.push(Entry(steady_clock::now() + milliseconds(static_cast<long>(delay * 1000)), &node)); return true; } diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h index 93752dcfd5c..68b49cde606 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.h +++ b/messagebus/src/vespa/messagebus/routing/resender.h @@ -5,7 +5,6 @@ #include <vespa/messagebus/queue.h> #include <vespa/messagebus/reply.h> #include <vespa/vespalib/util/sync.h> -#include <vespa/fastos/time.h> #include <queue> #include <vector> @@ -23,18 +22,17 @@ class RoutingNode; class Resender { private: - typedef std::pair<uint64_t, RoutingNode*> Entry; + using time_point = std::chrono::steady_clock::time_point; + typedef std::pair<time_point , RoutingNode*> Entry; struct Cmp { bool operator()(const Entry &a, const Entry &b) { return (b.first < a.first); } }; - typedef std::priority_queue<Entry, std::vector<Entry>, Cmp> PriorityQueue; + using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>; PriorityQueue _queue; IRetryPolicy::SP _retryPolicy; - FastOS_Time _time; - public: /** * Convenience typedefs. diff --git a/messagebus/src/vespa/messagebus/systemtimer.cpp b/messagebus/src/vespa/messagebus/systemtimer.cpp index 6c787509fbf..0570d81ce68 100644 --- a/messagebus/src/vespa/messagebus/systemtimer.cpp +++ b/messagebus/src/vespa/messagebus/systemtimer.cpp @@ -1,15 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "systemtimer.h" -#include <vespa/fastos/time.h> +#include <chrono> +using namespace std::chrono; namespace mbus { uint64_t SystemTimer::getMilliTime() const { - FastOS_Time time; - time.SetNow(); - return (uint64_t)time.MilliSecs(); + return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count(); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp index f821021a482..f98a4be05c3 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp +++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp @@ -2,17 +2,12 @@ #include "receptor.h" -namespace mbus { +using namespace std::chrono; -Receptor::Receptor() - : IMessageHandler(), - IReplyHandler(), - _mon(), - _msg(), - _reply() -{ } +namespace mbus { -Receptor::~Receptor() {} +Receptor::Receptor() = default; +Receptor::~Receptor() = default; void Receptor::handleMessage(Message::UP msg) @@ -33,12 +28,11 @@ Receptor::handleReply(Reply::UP reply) Message::UP Receptor::getMessage(double maxWait) { - int ms = (int)(maxWait * 1000); - FastOS_Time startTime; - startTime.SetNow(); + int64_t ms = (int64_t)(maxWait * 1000); + steady_clock::time_point startTime = steady_clock::now(); vespalib::MonitorGuard guard(_mon); while (_msg.get() == 0) { - int w = ms - (int)startTime.MilliSecsToNow(); + int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count(); if (w <= 0 || !guard.wait(w)) { break; } @@ -49,12 +43,11 @@ Receptor::getMessage(double maxWait) Reply::UP Receptor::getReply(double maxWait) { - int ms = (int)(maxWait * 1000); - FastOS_Time startTime; - startTime.SetNow(); + int64_t ms = (int)(maxWait * 1000); + steady_clock::time_point startTime = steady_clock::now(); vespalib::MonitorGuard guard(_mon); while (_reply.get() == 0) { - int w = ms - (int)startTime.MilliSecsToNow(); + int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count(); if (w <= 0 || !guard.wait(w)) { break; } |