diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-20 21:45:17 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-11-20 21:45:17 +0000 |
commit | afbe78369142bf8f02e89eb49117eef91a76e867 (patch) | |
tree | 6e6133dc989172ef1b212981ae62ae3ab4280eb5 /messagebus | |
parent | 3c9a7ec70c36bf238765588859fa7b6672a9ff8a (diff) |
Address comments from code review.
Diffstat (limited to 'messagebus')
15 files changed, 30 insertions, 35 deletions
diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp index 5a4074f7f02..520c3d3dea3 100644 --- a/messagebus/src/tests/messageordering/messageordering.cpp +++ b/messagebus/src/tests/messageordering/messageordering.cpp @@ -155,7 +155,7 @@ Test::Main() ssp.setTimeout(400s); SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp); DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst); - ASSERT_EQUAL(400.0, ssp.getTimeout().count()); + ASSERT_EQUAL(400s, ssp.getTimeout()); // wait for slobrok registration ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session")); diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp index 7c90a7f958f..a7a35508656 100644 --- a/messagebus/src/tests/routable/routable.cpp +++ b/messagebus/src/tests/routable/routable.cpp @@ -42,8 +42,8 @@ Test::Main() EXPECT_EQUAL(3u, foo.getRetry()); EXPECT_EQUAL(1u, bar.getRetry()); EXPECT_TRUE(foo.getTimeReceived() >= bar.getTimeReceived()); - EXPECT_EQUAL(4u, foo.getTimeRemaining().count()); - EXPECT_EQUAL(2u, bar.getTimeRemaining().count()); + EXPECT_EQUAL(4ms, foo.getTimeRemaining()); + EXPECT_EQUAL(2ms, bar.getTimeRemaining()); } { // Test reply swap state. diff --git a/messagebus/src/vespa/messagebus/CMakeLists.txt b/messagebus/src/vespa/messagebus/CMakeLists.txt index 4dcc059ce81..1dfac1fbdac 100644 --- a/messagebus/src/vespa/messagebus/CMakeLists.txt +++ b/messagebus/src/vespa/messagebus/CMakeLists.txt @@ -30,7 +30,7 @@ vespa_add_library(messagebus sourcesession.cpp sourcesessionparams.cpp staticthrottlepolicy.cpp - steadytimer.cpp + steadytimer.cpp $<TARGET_OBJECTS:messagebus_routing> $<TARGET_OBJECTS:messagebus_network> INSTALL lib64 diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h index c8de001bd97..df25bf17973 100644 --- a/messagebus/src/vespa/messagebus/common.h +++ b/messagebus/src/vespa/messagebus/common.h @@ -10,6 +10,8 @@ namespace mbus { using string = vespalib::string; using seconds = std::chrono::duration<double>; +using milliseconds = std::chrono::milliseconds; + } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index dce4d86450a..4021143a052 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -13,7 +13,6 @@ namespace mbus { class Message : public Routable { public: using time_point = std::chrono::steady_clock::time_point; - using milliseconds = std::chrono::milliseconds; using UP = std::unique_ptr<Message>; Message(); diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 19e33cb02fd..5ae6b07c3fa 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -7,7 +7,6 @@ #include "rpcnetworkparams.h" #include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/iprotocol.h> -#include <vespa/messagebus/tracelevel.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/routing/routingnode.h> #include <vespa/slobrok/sbregister.h> @@ -15,7 +14,6 @@ #include <vespa/vespalib/component/vtag.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> @@ -46,7 +44,7 @@ public: _gate() { ScheduleNow(); } - ~SyncTask() = default; + ~SyncTask() override = default; void await() { _gate.await(); @@ -322,7 +320,7 @@ void RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients) { SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self - double timeout = ctx._msg.getTimeRemainingNow().count() / 1000.0; + seconds timeout = ctx._msg.getTimeRemainingNow(); for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) { RoutingNode *&recipient = ctx._recipients[i]; @@ -335,7 +333,8 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients namespace { -void emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& recipient) { +void +emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& recipient) { if (recipient.hasServiceAddress()) { // At this point the service addresses _should_ be RPCServiceAddress instances, // but stay on the safe side of the tracks anyway. @@ -352,7 +351,8 @@ void emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& r } -vespalib::string RPCNetwork::buildRecipientListString(const SendContext& ctx) { +vespalib::string +RPCNetwork::buildRecipientListString(const SendContext& ctx) { vespalib::asciistream s; bool first = true; for (const auto* recipient : ctx._recipients) { diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index bed2847f3c6..e0fae8eabd6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -126,7 +126,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.", version.toString().c_str(), _clientIdent.c_str(), - address.getServiceName().c_str(), ctx->getTimeout())); + address.getServiceName().c_str(), ctx->getTimeout().count())); } if (hop.getIgnoreResult()) { @@ -141,7 +141,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, } else { SendContext *ptr = ctx.release(); req->SetContext(FNET_Context(ptr)); - address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this); + address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout().count(), this); } } @@ -164,7 +164,7 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) { case FRTE_RPC_TIMEOUT: error = Error(ErrorCode::TIMEOUT, make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s", - serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage())); + serviceName.c_str(), ctx->getTimeout().count(), req->GetErrorMessage())); break; case FRTE_RPC_CONNECTION: error = Error(ErrorCode::CONNECTION_ERROR, diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index e207f7c1812..e7bf5495974 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -38,7 +38,6 @@ class RPCSend : public RPCSendAdapter, public: class Params { public: - using milliseconds = std::chrono::milliseconds; virtual ~Params() {} virtual vespalib::Version getVersion() const = 0; virtual vespalib::stringref getProtocol() const = 0; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h index 6462bb0691e..0b620b3b11f 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h @@ -10,24 +10,22 @@ namespace mbus::network::internal { * an rpc return value. This object is held as the context of an FRT_RPCRequest. */ class SendContext { -private: - mbus::RoutingNode &_recipient; - mbus::Trace _trace; - double _timeout; - public: - using milliseconds = std::chrono::milliseconds; using UP = std::unique_ptr<SendContext>; SendContext(const SendContext &) = delete; SendContext & operator = (const SendContext &) = delete; SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining) : _recipient(recipient), _trace(recipient.getTrace().getLevel()), - _timeout(timeRemaining.count() * 0.001) + _timeout(timeRemaining) { } mbus::RoutingNode &getRecipient() { return _recipient; } mbus::Trace &getTrace() { return _trace; } - double getTimeout() { return _timeout; } + seconds getTimeout() { return _timeout; } +private: + mbus::RoutingNode &_recipient; + mbus::Trace _trace; + seconds _timeout; }; /** diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h index 9b841811bb2..cc89bb022b9 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h @@ -2,8 +2,8 @@ #pragma once #include <vespa/messagebus/blobref.h> +#include <vespa/messagebus/common.h> #include <vespa/vespalib/component/version.h> -#include <chrono> namespace mbus { @@ -20,7 +20,6 @@ class RPCSendAdapter protected: RPCSendAdapter() = default; public: - using milliseconds = std::chrono::milliseconds; RPCSendAdapter(const RPCSendAdapter &) = delete; RPCSendAdapter & operator = (const RPCSendAdapter &) = delete; /** diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index 04423b5e90b..bb3bf1d172d 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -23,7 +23,7 @@ RPCTarget::~RPCTarget() } void -RPCTarget::resolveVersion(double timeout, RPCTarget::IVersionHandler &handler) +RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler) { bool hasVersion = false; bool shouldInvoke = false; @@ -47,7 +47,7 @@ RPCTarget::resolveVersion(double timeout, RPCTarget::IVersionHandler &handler) } else if (shouldInvoke) { FRT_RPCRequest *req = _orb.AllocRPCRequest(); req->SetMethodName("mbus.getVersion"); - _target.InvokeAsync(req, timeout, this); + _target.InvokeAsync(req, timeout.count(), this); } } @@ -84,10 +84,8 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) _versionHandlers.swap(handlers); _state = PROCESSING_HANDLERS; } - for (HandlerList::iterator it = handlers.begin(); - it != handlers.end(); ++it) - { - (*it)->handleVersion(_version.get()); + for (IVersionHandler * handler : handlers) { + handler->handleVersion(_version.get()); } { vespalib::MonitorGuard guard(_lock); diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index cd1028d6df4..9c089381de7 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -88,7 +88,7 @@ public: * @param timeout The timeout for the request in milliseconds. * @param handler The handler to be called once the version is available. */ - void resolveVersion(double timeout, IVersionHandler &handler); + void resolveVersion(seconds timeout, IVersionHandler &handler); /** * @return true if the FRT target is valid or has been invoked (which diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index 26c30d92375..979a9d61491 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -74,7 +74,7 @@ Resender::scheduleRetry(RoutingNode &node) if (delay < 0) { delay = _retryPolicy->getRetryDelay(retry); } - milliseconds delayMS(static_cast<long>(delay * 1000)); + milliseconds delayMS(long(delay * 1000)); if (msg.getTimeRemainingNow() <= delayMS) { node.addError(ErrorCode::TIMEOUT, "Timeout exceeded by resender, giving up."); return false; diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h index bc4cad48ba5..9ee17280d40 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.h +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h @@ -7,13 +7,12 @@ namespace mbus { - /** +/** * To facilitate several configuration parameters to the {@link MessageBus#createSourceSession(ReplyHandler, * SourceSessionParams)}, all parameters are held by this class. This class has reasonable default values for each * parameter. * * @author Simon Thoresen Hult - * @version $Id$ */ class SourceSessionParams { private: diff --git a/messagebus/src/vespa/messagebus/steadytimer.h b/messagebus/src/vespa/messagebus/steadytimer.h index 12a175368ac..919dd54b7d5 100644 --- a/messagebus/src/vespa/messagebus/steadytimer.h +++ b/messagebus/src/vespa/messagebus/steadytimer.h @@ -12,6 +12,7 @@ namespace mbus { */ class SteadyTimer : public ITimer { public: + //TODO Return chrono::duration uint64_t getMilliTime() const override; }; |