aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-11-20 21:45:17 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-11-20 21:45:17 +0000
commitafbe78369142bf8f02e89eb49117eef91a76e867 (patch)
tree6e6133dc989172ef1b212981ae62ae3ab4280eb5 /messagebus
parent3c9a7ec70c36bf238765588859fa7b6672a9ff8a (diff)
Address comments from code review.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/messageordering/messageordering.cpp2
-rw-r--r--messagebus/src/tests/routable/routable.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/CMakeLists.txt2
-rw-r--r--messagebus/src/vespa/messagebus/common.h2
-rw-r--r--messagebus/src/vespa/messagebus/message.h1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp12
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend_private.h14
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendadapter.h3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h2
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.h3
-rw-r--r--messagebus/src/vespa/messagebus/steadytimer.h1
15 files changed, 30 insertions, 35 deletions
diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp
index 5a4074f7f02..520c3d3dea3 100644
--- a/messagebus/src/tests/messageordering/messageordering.cpp
+++ b/messagebus/src/tests/messageordering/messageordering.cpp
@@ -155,7 +155,7 @@ Test::Main()
ssp.setTimeout(400s);
SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp);
DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst);
- ASSERT_EQUAL(400.0, ssp.getTimeout().count());
+ ASSERT_EQUAL(400s, ssp.getTimeout());
// wait for slobrok registration
ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session"));
diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp
index 7c90a7f958f..a7a35508656 100644
--- a/messagebus/src/tests/routable/routable.cpp
+++ b/messagebus/src/tests/routable/routable.cpp
@@ -42,8 +42,8 @@ Test::Main()
EXPECT_EQUAL(3u, foo.getRetry());
EXPECT_EQUAL(1u, bar.getRetry());
EXPECT_TRUE(foo.getTimeReceived() >= bar.getTimeReceived());
- EXPECT_EQUAL(4u, foo.getTimeRemaining().count());
- EXPECT_EQUAL(2u, bar.getTimeRemaining().count());
+ EXPECT_EQUAL(4ms, foo.getTimeRemaining());
+ EXPECT_EQUAL(2ms, bar.getTimeRemaining());
}
{
// Test reply swap state.
diff --git a/messagebus/src/vespa/messagebus/CMakeLists.txt b/messagebus/src/vespa/messagebus/CMakeLists.txt
index 4dcc059ce81..1dfac1fbdac 100644
--- a/messagebus/src/vespa/messagebus/CMakeLists.txt
+++ b/messagebus/src/vespa/messagebus/CMakeLists.txt
@@ -30,7 +30,7 @@ vespa_add_library(messagebus
sourcesession.cpp
sourcesessionparams.cpp
staticthrottlepolicy.cpp
- steadytimer.cpp
+ steadytimer.cpp
$<TARGET_OBJECTS:messagebus_routing>
$<TARGET_OBJECTS:messagebus_network>
INSTALL lib64
diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h
index c8de001bd97..df25bf17973 100644
--- a/messagebus/src/vespa/messagebus/common.h
+++ b/messagebus/src/vespa/messagebus/common.h
@@ -10,6 +10,8 @@ namespace mbus {
using string = vespalib::string;
using seconds = std::chrono::duration<double>;
+using milliseconds = std::chrono::milliseconds;
+
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h
index dce4d86450a..4021143a052 100644
--- a/messagebus/src/vespa/messagebus/message.h
+++ b/messagebus/src/vespa/messagebus/message.h
@@ -13,7 +13,6 @@ namespace mbus {
class Message : public Routable {
public:
using time_point = std::chrono::steady_clock::time_point;
- using milliseconds = std::chrono::milliseconds;
using UP = std::unique_ptr<Message>;
Message();
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 19e33cb02fd..5ae6b07c3fa 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -7,7 +7,6 @@
#include "rpcnetworkparams.h"
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/iprotocol.h>
-#include <vespa/messagebus/tracelevel.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/routing/routingnode.h>
#include <vespa/slobrok/sbregister.h>
@@ -15,7 +14,6 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
@@ -46,7 +44,7 @@ public:
_gate() {
ScheduleNow();
}
- ~SyncTask() = default;
+ ~SyncTask() override = default;
void await() {
_gate.await();
@@ -322,7 +320,7 @@ void
RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients)
{
SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self
- double timeout = ctx._msg.getTimeRemainingNow().count() / 1000.0;
+ seconds timeout = ctx._msg.getTimeRemainingNow();
for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) {
RoutingNode *&recipient = ctx._recipients[i];
@@ -335,7 +333,8 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients
namespace {
-void emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& recipient) {
+void
+emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& recipient) {
if (recipient.hasServiceAddress()) {
// At this point the service addresses _should_ be RPCServiceAddress instances,
// but stay on the safe side of the tracks anyway.
@@ -352,7 +351,8 @@ void emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& r
}
-vespalib::string RPCNetwork::buildRecipientListString(const SendContext& ctx) {
+vespalib::string
+RPCNetwork::buildRecipientListString(const SendContext& ctx) {
vespalib::asciistream s;
bool first = true;
for (const auto* recipient : ctx._recipients) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index bed2847f3c6..e0fae8eabd6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -126,7 +126,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
version.toString().c_str(), _clientIdent.c_str(),
- address.getServiceName().c_str(), ctx->getTimeout()));
+ address.getServiceName().c_str(), ctx->getTimeout().count()));
}
if (hop.getIgnoreResult()) {
@@ -141,7 +141,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
} else {
SendContext *ptr = ctx.release();
req->SetContext(FNET_Context(ptr));
- address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
+ address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout().count(), this);
}
}
@@ -164,7 +164,7 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) {
case FRTE_RPC_TIMEOUT:
error = Error(ErrorCode::TIMEOUT,
make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
- serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
+ serviceName.c_str(), ctx->getTimeout().count(), req->GetErrorMessage()));
break;
case FRTE_RPC_CONNECTION:
error = Error(ErrorCode::CONNECTION_ERROR,
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index e207f7c1812..e7bf5495974 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -38,7 +38,6 @@ class RPCSend : public RPCSendAdapter,
public:
class Params {
public:
- using milliseconds = std::chrono::milliseconds;
virtual ~Params() {}
virtual vespalib::Version getVersion() const = 0;
virtual vespalib::stringref getProtocol() const = 0;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
index 6462bb0691e..0b620b3b11f 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
@@ -10,24 +10,22 @@ namespace mbus::network::internal {
* an rpc return value. This object is held as the context of an FRT_RPCRequest.
*/
class SendContext {
-private:
- mbus::RoutingNode &_recipient;
- mbus::Trace _trace;
- double _timeout;
-
public:
- using milliseconds = std::chrono::milliseconds;
using UP = std::unique_ptr<SendContext>;
SendContext(const SendContext &) = delete;
SendContext & operator = (const SendContext &) = delete;
SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining)
: _recipient(recipient),
_trace(recipient.getTrace().getLevel()),
- _timeout(timeRemaining.count() * 0.001)
+ _timeout(timeRemaining)
{ }
mbus::RoutingNode &getRecipient() { return _recipient; }
mbus::Trace &getTrace() { return _trace; }
- double getTimeout() { return _timeout; }
+ seconds getTimeout() { return _timeout; }
+private:
+ mbus::RoutingNode &_recipient;
+ mbus::Trace _trace;
+ seconds _timeout;
};
/**
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
index 9b841811bb2..cc89bb022b9 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
@@ -2,8 +2,8 @@
#pragma once
#include <vespa/messagebus/blobref.h>
+#include <vespa/messagebus/common.h>
#include <vespa/vespalib/component/version.h>
-#include <chrono>
namespace mbus {
@@ -20,7 +20,6 @@ class RPCSendAdapter
protected:
RPCSendAdapter() = default;
public:
- using milliseconds = std::chrono::milliseconds;
RPCSendAdapter(const RPCSendAdapter &) = delete;
RPCSendAdapter & operator = (const RPCSendAdapter &) = delete;
/**
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index 04423b5e90b..bb3bf1d172d 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -23,7 +23,7 @@ RPCTarget::~RPCTarget()
}
void
-RPCTarget::resolveVersion(double timeout, RPCTarget::IVersionHandler &handler)
+RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler)
{
bool hasVersion = false;
bool shouldInvoke = false;
@@ -47,7 +47,7 @@ RPCTarget::resolveVersion(double timeout, RPCTarget::IVersionHandler &handler)
} else if (shouldInvoke) {
FRT_RPCRequest *req = _orb.AllocRPCRequest();
req->SetMethodName("mbus.getVersion");
- _target.InvokeAsync(req, timeout, this);
+ _target.InvokeAsync(req, timeout.count(), this);
}
}
@@ -84,10 +84,8 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
_versionHandlers.swap(handlers);
_state = PROCESSING_HANDLERS;
}
- for (HandlerList::iterator it = handlers.begin();
- it != handlers.end(); ++it)
- {
- (*it)->handleVersion(_version.get());
+ for (IVersionHandler * handler : handlers) {
+ handler->handleVersion(_version.get());
}
{
vespalib::MonitorGuard guard(_lock);
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index cd1028d6df4..9c089381de7 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -88,7 +88,7 @@ public:
* @param timeout The timeout for the request in milliseconds.
* @param handler The handler to be called once the version is available.
*/
- void resolveVersion(double timeout, IVersionHandler &handler);
+ void resolveVersion(seconds timeout, IVersionHandler &handler);
/**
* @return true if the FRT target is valid or has been invoked (which
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index 26c30d92375..979a9d61491 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -74,7 +74,7 @@ Resender::scheduleRetry(RoutingNode &node)
if (delay < 0) {
delay = _retryPolicy->getRetryDelay(retry);
}
- milliseconds delayMS(static_cast<long>(delay * 1000));
+ milliseconds delayMS(long(delay * 1000));
if (msg.getTimeRemainingNow() <= delayMS) {
node.addError(ErrorCode::TIMEOUT, "Timeout exceeded by resender, giving up.");
return false;
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h
index bc4cad48ba5..9ee17280d40 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.h
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h
@@ -7,13 +7,12 @@
namespace mbus {
- /**
+/**
* To facilitate several configuration parameters to the {@link MessageBus#createSourceSession(ReplyHandler,
* SourceSessionParams)}, all parameters are held by this class. This class has reasonable default values for each
* parameter.
*
* @author Simon Thoresen Hult
- * @version $Id$
*/
class SourceSessionParams {
private:
diff --git a/messagebus/src/vespa/messagebus/steadytimer.h b/messagebus/src/vespa/messagebus/steadytimer.h
index 12a175368ac..919dd54b7d5 100644
--- a/messagebus/src/vespa/messagebus/steadytimer.h
+++ b/messagebus/src/vespa/messagebus/steadytimer.h
@@ -12,6 +12,7 @@ namespace mbus {
*/
class SteadyTimer : public ITimer {
public:
+ //TODO Return chrono::duration
uint64_t getMilliTime() const override;
};