summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-12-03 21:45:53 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-12-04 18:26:06 +0000
commitb8e151a435ccec1ecc03d98bac5b59f4f14514be (patch)
tree140efda301a7e5adc407c44061ba5b0bb41dd212
parent7700f411ea6f4a3e7c0599fae239ec84c18c0038 (diff)
timeout as duration
Conflicts: messagebus/src/vespa/messagebus/testlib/testserver.cpp
-rw-r--r--documentapi/src/tests/policies/policies_test.cpp52
-rw-r--r--documentapi/src/tests/policies/testframe.cpp4
-rw-r--r--documentapi/src/tests/policyfactory/policyfactory.cpp6
-rw-r--r--documentapi/src/tests/routablefactory/routablefactory.cpp34
-rw-r--r--messagebus/src/tests/advancedrouting/advancedrouting.cpp22
-rw-r--r--messagebus/src/tests/choke/choke.cpp30
-rw-r--r--messagebus/src/tests/context/context.cpp18
-rw-r--r--messagebus/src/tests/messagebus/messagebus.cpp68
-rw-r--r--messagebus/src/tests/resender/resender.cpp68
-rw-r--r--messagebus/src/tests/routable/routable.cpp4
-rw-r--r--messagebus/src/tests/routablequeue/routablequeue.cpp20
-rw-r--r--messagebus/src/tests/routing/routing.cpp133
-rw-r--r--messagebus/src/tests/routingcontext/routingcontext.cpp2
-rw-r--r--messagebus/src/tests/sendadapter/sendadapter.cpp10
-rw-r--r--messagebus/src/tests/sequencer/sequencer.cpp8
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp6
-rw-r--r--messagebus/src/tests/sourcesession/sourcesession.cpp28
-rw-r--r--messagebus/src/tests/throttling/throttling.cpp26
-rw-r--r--messagebus/src/tests/timeout/timeout.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/common.h9
-rw-r--r--messagebus/src/vespa/messagebus/message.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/message.h9
-rw-r--r--messagebus/src/vespa/messagebus/network/inetwork.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp12
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp18
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend_private.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendadapter.h4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h2
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.h3
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp8
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.h4
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.h7
-rw-r--r--messagebus/src/vespa/messagebus/steadytimer.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.h6
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp1
-rw-r--r--messagebus_test/src/tests/error/cpp-client.cpp2
-rw-r--r--messagebus_test/src/tests/trace/trace.cpp4
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp18
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp4
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp36
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp4
-rw-r--r--storage/src/tests/storageserver/documentapiconvertertest.cpp4
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp8
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp4
-rw-r--r--storage/src/tests/visiting/commandqueuetest.cpp79
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp54
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h10
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h2
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp9
-rw-r--r--storage/src/vespa/storage/visiting/commandqueue.h6
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp8
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h2
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp2
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp34
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp4
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp4
-rw-r--r--storageapi/src/vespa/storageapi/message/visitor.cpp18
-rw-r--r--storageapi/src/vespa/storageapi/message/visitor.h31
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp7
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagecommand.h6
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/sync.h2
84 files changed, 559 insertions, 596 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp
index 93c5d51fef5..3dbc9dd7e69 100644
--- a/documentapi/src/tests/policies/policies_test.cpp
+++ b/documentapi/src/tests/policies/policies_test.cpp
@@ -92,8 +92,10 @@ public:
TEST_APPHOOK(Test);
-Test::Test() {}
-Test::~Test() {}
+Test::Test() = default;
+Test::~Test() = default;
+
+const vespalib::duration TIMEOUT = 600s;
int
Test::Main() {
@@ -230,7 +232,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600));
+ ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT));
}
EXPECT_EQUAL(servers.size(), lst.size());
for (uint32_t i = 0; i < servers.size(); ++i) {
@@ -332,14 +334,14 @@ Test::testExternSend()
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
- ASSERT_TRUE((msg = ir.getMessage(600)));
+ ASSERT_TRUE((msg = ir.getMessage(TIMEOUT)));
is->forward(std::move(msg));
- ASSERT_TRUE((msg = dr.getMessage(600)));
+ ASSERT_TRUE((msg = dr.getMessage(TIMEOUT)));
ds->acknowledge(std::move(msg));
- mbus::Reply::UP reply = ir.getReply(600);
+ mbus::Reply::UP reply = ir.getReply(TIMEOUT);
ASSERT_TRUE(reply);
is->forward(std::move(reply));
- ASSERT_TRUE((reply = sr.getReply(600)));
+ ASSERT_TRUE((reply = sr.getReply(TIMEOUT)));
fprintf(stderr, "%s", reply->getTrace().toString().c_str());
}
@@ -366,9 +368,9 @@ Test::testExternMultipleSlobroks()
mbus::Message::UP msg(new GetDocumentMessage(DocumentId("id:ns:testdoc::"), 0));
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
- ASSERT_TRUE((msg = dr.getMessage(600)));
+ ASSERT_TRUE((msg = dr.getMessage(TIMEOUT)));
ds->acknowledge(std::move(msg));
- mbus::Reply::UP reply = sr.getReply(600);
+ mbus::Reply::UP reply = sr.getReply(TIMEOUT);
ASSERT_TRUE(reply);
}
{
@@ -382,9 +384,9 @@ Test::testExternMultipleSlobroks()
mbus::Message::UP msg(new GetDocumentMessage(DocumentId("id:ns:testdoc::"), 0));
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
- ASSERT_TRUE((msg = dr.getMessage(600)));
+ ASSERT_TRUE((msg = dr.getMessage(TIMEOUT)));
ds->acknowledge(std::move(msg));
- mbus::Reply::UP reply = sr.getReply(600);
+ mbus::Reply::UP reply = sr.getReply(TIMEOUT);
ASSERT_TRUE(reply);
}
}
@@ -410,7 +412,7 @@ Test::testLocalService()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600));
+ ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT));
}
EXPECT_EQUAL(10u, lst.size());
@@ -423,7 +425,7 @@ Test::testLocalService()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600));
+ ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT));
}
EXPECT_EQUAL(1u, lst.size());
EXPECT_EQUAL("docproc/cluster.default/*/chain.default", *lst.begin());
@@ -466,8 +468,8 @@ Test::testLocalServiceCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600));
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(barFrame.getReceptor().getReply(TIMEOUT));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(TIMEOUT));
}
void
@@ -543,8 +545,8 @@ Test::testRoundRobinCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600));
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(barFrame.getReceptor().getReply(TIMEOUT));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(TIMEOUT));
}
void
@@ -573,7 +575,7 @@ Test::multipleGetRepliesAreMergedToFoundDocument()
mbus::Reply::UP reply(new GetDocumentReply(std::move(doc)));
selected[i]->handleReply(std::move(reply));
}
- mbus::Reply::UP reply = frame.getReceptor().getReply(600);
+ mbus::Reply::UP reply = frame.getReceptor().getReply(TIMEOUT);
EXPECT_TRUE(reply);
EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), reply->getType());
EXPECT_EQUAL(123456ULL, static_cast<GetDocumentReply&>(*reply).getLastModified());
@@ -647,7 +649,7 @@ Test::testDocumentRouteSelectorIgnore()
make_shared<Document>(*_docType, DocumentId("id:yarn:testdoc:n=1234:fluff"))));
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 0));
- mbus::Reply::UP reply = frame.getReceptor().getReply(600);
+ mbus::Reply::UP reply = frame.getReceptor().getReply(TIMEOUT);
ASSERT_TRUE(reply);
EXPECT_EQUAL(uint32_t(DocumentProtocol::REPLY_DOCUMENTIGNORED), reply->getType());
EXPECT_EQUAL(0u, reply->getNumErrors());
@@ -940,7 +942,7 @@ Test::testSubsetService()
ASSERT_TRUE(frame.select(leaf, 1));
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600));
+ ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT));
}
ASSERT_TRUE(lst.size() > 1); // must have requeried
@@ -959,7 +961,7 @@ Test::testSubsetService()
prev = next;
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600));
+ ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT));
}
// Test requerying for dropping nodes.
@@ -976,7 +978,7 @@ Test::testSubsetService()
mbus::Reply::UP reply(new mbus::EmptyReply());
reply->addError(mbus::Error(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, route));
leaf[0]->handleReply(std::move(reply));
- ASSERT_TRUE(frame.getReceptor().getReply(600));
+ ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT));
}
EXPECT_EQUAL(10u, lst.size());
@@ -1018,8 +1020,8 @@ Test::testSubsetServiceCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600));
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(barFrame.getReceptor().getReply(TIMEOUT));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(TIMEOUT));
}
bool
@@ -1034,7 +1036,7 @@ Test::trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string>
} else {
frame.select(leaf, 0);
}
- if( ! frame.getReceptor().getReply(600)) {
+ if( ! frame.getReceptor().getReply(TIMEOUT)) {
LOG(error, "Reply failed to propagate to reply handler.");
return false;
}
diff --git a/documentapi/src/tests/policies/testframe.cpp b/documentapi/src/tests/policies/testframe.cpp
index 4cdc5d4ba14..9834e534a56 100644
--- a/documentapi/src/tests/policies/testframe.cpp
+++ b/documentapi/src/tests/policies/testframe.cpp
@@ -122,7 +122,7 @@ TestFrame::testSelect(const std::vector<string> &expected)
}
node->handleReply(std::make_unique<mbus::EmptyReply>());
}
- if (_handler.getReply(600).get() == nullptr) {
+ if (_handler.getReply(600s).get() == nullptr) {
LOG(error, "Reply not propagated to handler.");
return false;
}
@@ -166,7 +166,7 @@ TestFrame::testMerge(const ReplyMap &replies,
node->handleReply(std::move(ret));
}
- mbus::Reply::UP reply = _handler.getReply(600);
+ mbus::Reply::UP reply = _handler.getReply(600s);
if (reply.get() == nullptr) {
LOG(error, "Reply not propagated to handler.");
return false;
diff --git a/documentapi/src/tests/policyfactory/policyfactory.cpp b/documentapi/src/tests/policyfactory/policyfactory.cpp
index 877ade22e2a..729818c5c4a 100644
--- a/documentapi/src/tests/policyfactory/policyfactory.cpp
+++ b/documentapi/src/tests/policyfactory/policyfactory.cpp
@@ -73,6 +73,8 @@ createMessage()
TEST_SETUP(Test);
+const vespalib::duration TIMEOUT = 600s;
+
int
Test::Main()
{
@@ -89,7 +91,7 @@ Test::Main()
mbus::Route route = mbus::Route::parse("[MyPolicy]");
ASSERT_TRUE(src->send(createMessage(), route).isAccepted());
- mbus::Reply::UP reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(600);
+ mbus::Reply::UP reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(TIMEOUT);
ASSERT_TRUE(reply);
fprintf(stderr, "%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
@@ -101,7 +103,7 @@ Test::Main()
protocol->putRoutingPolicyFactory("MyPolicy", std::make_shared<MyFactory>());
ASSERT_TRUE(src->send(createMessage(), route).isAccepted());
- reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(600);
+ reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(TIMEOUT);
ASSERT_TRUE(reply);
fprintf(stderr, "%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
diff --git a/documentapi/src/tests/routablefactory/routablefactory.cpp b/documentapi/src/tests/routablefactory/routablefactory.cpp
index 3f94d120d66..32a36ef0b59 100644
--- a/documentapi/src/tests/routablefactory/routablefactory.cpp
+++ b/documentapi/src/tests/routablefactory/routablefactory.cpp
@@ -167,6 +167,8 @@ Test::Main()
//
///////////////////////////////////////////////////////////////////////////////
+const vespalib::duration TIMEOUT = 600s;
+
void
Test::testFactory(TestData &data)
{
@@ -174,8 +176,8 @@ Test::testFactory(TestData &data)
// Source should fail to encode the message.
EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted());
- mbus::Reply::UP reply = data._srcHandler.getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ mbus::Reply::UP reply = data._srcHandler.getReply(TIMEOUT);
+ ASSERT_TRUE(reply);
fprintf(stderr, "%s\n", reply->getTrace().toString().c_str());
ASSERT_TRUE(reply->hasErrors());
EXPECT_EQUAL((uint32_t)mbus::ErrorCode::ENCODE_ERROR, reply->getError(0).getCode());
@@ -185,8 +187,8 @@ Test::testFactory(TestData &data)
data._srcProtocol->putRoutableFactory(MyMessage::TYPE, IRoutableFactory::SP(new MyMessageFactory()),
vespalib::VersionSpecification());
EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted());
- reply = data._srcHandler.getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ reply = data._srcHandler.getReply(TIMEOUT);
+ ASSERT_TRUE(reply);
fprintf(stderr, "%s\n", reply->getTrace().toString().c_str());
EXPECT_TRUE(reply->hasErrors());
EXPECT_EQUAL((uint32_t)mbus::ErrorCode::DECODE_ERROR, reply->getError(0).getCode());
@@ -196,13 +198,13 @@ Test::testFactory(TestData &data)
data._dstProtocol->putRoutableFactory(MyMessage::TYPE, IRoutableFactory::SP(new MyMessageFactory()),
vespalib::VersionSpecification());
EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted());
- mbus::Message::UP msg = data._dstHandler.getMessage(600);
- ASSERT_TRUE(msg.get() != NULL);
+ mbus::Message::UP msg = data._dstHandler.getMessage(TIMEOUT);
+ ASSERT_TRUE(msg);
reply.reset(new MyReply());
reply->swapState(*msg);
data._dstSession->reply(std::move(reply));
- reply = data._srcHandler.getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ reply = data._srcHandler.getReply(TIMEOUT);
+ ASSERT_TRUE(reply);
fprintf(stderr, "%s\n", reply->getTrace().toString().c_str());
EXPECT_TRUE(reply->hasErrors());
EXPECT_EQUAL((uint32_t)mbus::ErrorCode::ENCODE_ERROR, reply->getError(0).getCode());
@@ -212,13 +214,13 @@ Test::testFactory(TestData &data)
data._dstProtocol->putRoutableFactory(MyReply::TYPE, IRoutableFactory::SP(new MyReplyFactory()),
vespalib::VersionSpecification());
EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted());
- msg = data._dstHandler.getMessage(600);
- ASSERT_TRUE(msg.get() != NULL);
+ msg = data._dstHandler.getMessage(TIMEOUT);
+ ASSERT_TRUE(msg);
reply.reset(new MyReply());
reply->swapState(*msg);
data._dstSession->reply(std::move(reply));
- reply = data._srcHandler.getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ reply = data._srcHandler.getReply(TIMEOUT);
+ ASSERT_TRUE(reply);
fprintf(stderr, "%s\n", reply->getTrace().toString().c_str());
EXPECT_TRUE(reply->hasErrors());
EXPECT_EQUAL((uint32_t)mbus::ErrorCode::DECODE_ERROR, reply->getError(0).getCode());
@@ -228,13 +230,13 @@ Test::testFactory(TestData &data)
data._srcProtocol->putRoutableFactory(MyReply::TYPE, IRoutableFactory::SP(new MyReplyFactory()),
vespalib::VersionSpecification());
EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted());
- msg = data._dstHandler.getMessage(600);
- ASSERT_TRUE(msg.get() != NULL);
+ msg = data._dstHandler.getMessage(TIMEOUT);
+ ASSERT_TRUE(msg);
reply.reset(new MyReply());
reply->swapState(*msg);
data._dstSession->reply(std::move(reply));
- reply = data._srcHandler.getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ reply = data._srcHandler.getReply(TIMEOUT);
+ ASSERT_TRUE(reply);
fprintf(stderr, "%s\n", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
index b18653d272d..f1557b8a305 100644
--- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp
+++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
@@ -115,7 +115,7 @@ Test::Main()
void
Test::testAdvanced(TestData &data)
{
- const double TIMEOUT = 60;
+ const duration TIMEOUT = 60s;
IProtocol::SP protocol(new SimpleProtocol());
SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol);
simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE)));
@@ -130,41 +130,41 @@ Test::testAdvanced(TestData &data)
// Initial send.
Message::UP msg = data._fooHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._fooSession->acknowledge(std::move(msg));
msg = data._barHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "bar"));
data._barSession->reply(std::move(reply));
msg = data._bazHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
reply.reset(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "baz1"));
data._bazSession->reply(std::move(reply));
// First retry.
- msg = data._fooHandler.getMessage(0);
+ msg = data._fooHandler.getMessage(duration::zero());
ASSERT_TRUE(msg.get() == NULL);
msg = data._barHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._barSession->acknowledge(std::move(msg));
msg = data._bazHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
reply.reset(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "baz2"));
data._bazSession->reply(std::move(reply));
// Second retry.
- msg = data._fooHandler.getMessage(0);
+ msg = data._fooHandler.getMessage(duration::zero());
ASSERT_TRUE(msg.get() == NULL);
- msg = data._barHandler.getMessage(0);
+ msg = data._barHandler.getMessage(duration::zero());
ASSERT_TRUE(msg.get() == NULL);
msg = data._bazHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
reply.reset(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::FATAL_ERROR, "baz3"));
@@ -172,7 +172,7 @@ Test::testAdvanced(TestData &data)
// Done.
reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(2u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::FATAL_ERROR, reply->getError(0).getCode());
diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp
index 105da2b3bd1..e10d87ed2ba 100644
--- a/messagebus/src/tests/choke/choke.cpp
+++ b/messagebus/src/tests/choke/choke.cpp
@@ -71,13 +71,13 @@ TestData::start()
_srcSession = _srcServer.mb.createSourceSession(SourceSessionParams()
.setThrottlePolicy(IThrottlePolicy::SP())
.setReplyHandler(_srcHandler));
- if (_srcSession.get() == NULL) {
+ if ( ! _srcSession) {
return false;
}
_dstSession = _dstServer.mb.createDestinationSession(DestinationSessionParams()
.setName("session")
.setMessageHandler(_dstHandler));
- if (_dstSession.get() == NULL) {
+ if ( ! _dstSession) {
return false;
}
if (!_srcServer.waitSlobrok("dst/session", 1u)) {
@@ -108,7 +108,7 @@ Test::Main()
TEST_DONE();
}
-static const double TIMEOUT = 120;
+static const duration TIMEOUT = 120s;
////////////////////////////////////////////////////////////////////////////////
//
@@ -131,11 +131,11 @@ Test::testMaxCount(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
if (i < max) {
Message::UP msg = data._dstHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
lst.push_back(msg.release());
} else {
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode());
}
@@ -146,14 +146,14 @@ Test::testMaxCount(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
msg = reply->getMessage();
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted());
msg = data._dstHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
lst.push_back(msg.release());
}
while (!lst.empty()) {
@@ -163,7 +163,7 @@ Test::testMaxCount(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
EXPECT_EQUAL(0u, data._dstServer.mb.getPendingCount());
@@ -185,11 +185,11 @@ Test::testMaxSize(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
if (i < max) {
Message::UP msg = data._dstHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
lst.push_back(msg.release());
} else {
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode());
}
@@ -200,14 +200,14 @@ Test::testMaxSize(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
msg = reply->getMessage();
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted());
msg = data._dstHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
lst.push_back(msg.release());
}
while (!lst.empty()) {
@@ -217,7 +217,7 @@ Test::testMaxSize(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
EXPECT_EQUAL(0u, data._dstServer.mb.getPendingSize());
diff --git a/messagebus/src/tests/context/context.cpp b/messagebus/src/tests/context/context.cpp
index de9dd1b83a6..39a7ca7b467 100644
--- a/messagebus/src/tests/context/context.cpp
+++ b/messagebus/src/tests/context/context.cpp
@@ -1,17 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/intermediatesession.h>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/routablequeue.h>
#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/messagebus/sourcesession.h>
#include <vespa/messagebus/sourcesessionparams.h>
-#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simplereply.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
@@ -24,7 +20,7 @@ struct Handler : public IMessageHandler
Handler(MessageBus &mb) : session() {
session = mb.createDestinationSession("session", true, *this);
}
- ~Handler() {
+ ~Handler() override {
session.reset();
}
void handleMessage(Message::UP msg) override {
@@ -81,18 +77,18 @@ Test::Main()
}
EXPECT_EQUAL(queue.size(), 3u);
{
- Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release());
- ASSERT_TRUE(reply.get() != 0);
+ Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release());
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(reply->getContext().value.UINT64, 10u);
}
{
- Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release());
- ASSERT_TRUE(reply.get() != 0);
+ Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release());
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(reply->getContext().value.UINT64, 20u);
}
{
- Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release());
- ASSERT_TRUE(reply.get() != 0);
+ Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release());
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(reply->getContext().value.UINT64, 30u);
}
TEST_DONE();
diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp
index 7434941a900..cf249e6eaec 100644
--- a/messagebus/src/tests/messagebus/messagebus.cpp
+++ b/messagebus/src/tests/messagebus/messagebus.cpp
@@ -21,7 +21,7 @@ struct Base {
Base() : queue() {}
virtual ~Base() {
while (queue.size() > 0) {
- Routable::UP r = queue.dequeue(0);
+ Routable::UP r = queue.dequeue();
r->getCallStack().discard();
}
}
@@ -219,8 +219,8 @@ Test::testSendToAny()
for (uint32_t i = 0; i < dpVec.size(); ++i) {
DocProc *p = dpVec[i];
while (p->queue.size() > 0) {
- Routable::UP msg = p->queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = p->queue.dequeue();
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
msg->swapState(*reply);
reply->addError(Error(ErrorCode::FATAL_ERROR, ""));
@@ -229,8 +229,8 @@ Test::testSendToAny()
}
EXPECT_TRUE(client->waitQueueSize(300));
while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue(0);
- ASSERT_TRUE(reply.get() != 0);
+ Routable::UP reply = client->queue.dequeue();
+ ASSERT_TRUE(reply);
ASSERT_TRUE(reply->isReply());
EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1);
}
@@ -262,8 +262,8 @@ Test::testSendToCol()
for (uint32_t i = 0; i < searchVec.size(); ++i) {
Search *s = searchVec[i];
while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = s->queue.dequeue();
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
msg->swapState(*reply);
s->session->reply(std::move(reply));
@@ -273,8 +273,8 @@ Test::testSendToCol()
FastOS_Thread::Sleep(100);
client->waitQueueSize(300);
while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue(0);
- ASSERT_TRUE(reply.get() != 0);
+ Routable::UP reply = client->queue.dequeue();
+ ASSERT_TRUE(reply);
ASSERT_TRUE(reply->isReply());
EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0);
}
@@ -296,8 +296,8 @@ Test::testSendToAnyThenCol()
for (uint32_t i = 0; i < dpVec.size(); ++i) {
DocProc *p = dpVec[i];
while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = p->queue.dequeue();
+ ASSERT_TRUE(r);
p->session->forward(std::move(r));
}
}
@@ -316,8 +316,8 @@ Test::testSendToAnyThenCol()
for (uint32_t i = 0; i < dpVec.size(); ++i) {
DocProc *p = dpVec[i];
while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = p->queue.dequeue();
+ ASSERT_TRUE(r);
p->session->forward(std::move(r));
}
}
@@ -328,8 +328,8 @@ Test::testSendToAnyThenCol()
for (uint32_t i = 0; i < searchVec.size(); ++i) {
Search *s = searchVec[i];
while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = s->queue.dequeue();
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
msg->swapState(*reply);
s->session->reply(std::move(reply));
@@ -341,8 +341,8 @@ Test::testSendToAnyThenCol()
for (uint32_t i = 0; i < dpVec.size(); ++i) {
DocProc *p = dpVec[i];
while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = p->queue.dequeue();
+ ASSERT_TRUE(r);
p->session->forward(std::move(r));
}
}
@@ -350,8 +350,8 @@ Test::testSendToAnyThenCol()
FastOS_Thread::Sleep(100);
client->waitQueueSize(300);
while (client->queue.size() > 0) {
- Routable::UP reply = client->queue.dequeue(0);
- ASSERT_TRUE(reply.get() != 0);
+ Routable::UP reply = client->queue.dequeue();
+ ASSERT_TRUE(reply);
ASSERT_TRUE(reply->isReply());
EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0);
}
@@ -423,8 +423,8 @@ void
Test::assertDst(Search& dst)
{
ASSERT_TRUE(dst.waitQueueSize(1));
- Routable::UP msg = dst.queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = dst.queue.dequeue();
+ ASSERT_TRUE(msg);
dst.session->acknowledge(Message::UP(static_cast<Message*>(msg.release())));
}
@@ -432,8 +432,8 @@ void
Test::assertItr(DocProc& itr)
{
ASSERT_TRUE(itr.waitQueueSize(1));
- Routable::UP msg = itr.queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = itr.queue.dequeue();
+ ASSERT_TRUE(msg);
itr.session->forward(std::move(msg));
}
@@ -441,8 +441,8 @@ void
Test::assertSrc(Client& src)
{
ASSERT_TRUE(src.waitQueueSize(1));
- Routable::UP msg = src.queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = src.queue.dequeue();
+ ASSERT_TRUE(msg);
}
void
@@ -485,8 +485,8 @@ Test::debugTrace()
for (uint32_t i = 0; i < dpVec.size(); ++i) {
DocProc *p = dpVec[i];
while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = p->queue.dequeue();
+ ASSERT_TRUE(r);
p->session->forward(std::move(r));
}
}
@@ -497,8 +497,8 @@ Test::debugTrace()
for (uint32_t i = 0; i < searchVec.size(); ++i) {
Search *s = searchVec[i];
while (s->queue.size() > 0) {
- Routable::UP msg = s->queue.dequeue(0);
- ASSERT_TRUE(msg.get() != 0);
+ Routable::UP msg = s->queue.dequeue();
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
msg->swapState(*reply);
s->session->reply(std::move(reply));
@@ -510,21 +510,21 @@ Test::debugTrace()
for (uint32_t i = 0; i < dpVec.size(); ++i) {
DocProc *p = dpVec[i];
while (p->queue.size() > 0) {
- Routable::UP r = p->queue.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = p->queue.dequeue();
+ ASSERT_TRUE(r);
p->session->forward(std::move(r));
}
}
client->waitQueueSize(3);
- Routable::UP reply = client->queue.dequeue(0);
+ Routable::UP reply = client->queue.dequeue();
fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
reply->getTrace().getLevel(),
reply->getTrace().toString().c_str());
- reply = client->queue.dequeue(0);
+ reply = client->queue.dequeue();
fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
reply->getTrace().getLevel(),
reply->getTrace().toString().c_str());
- reply = client->queue.dequeue(0);
+ reply = client->queue.dequeue();
fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n",
reply->getTrace().getLevel(),
reply->getTrace().toString().c_str());
diff --git a/messagebus/src/tests/resender/resender.cpp b/messagebus/src/tests/resender/resender.cpp
index cd7fdbeb6cc..f40dbdc5e4a 100644
--- a/messagebus/src/tests/resender/resender.cpp
+++ b/messagebus/src/tests/resender/resender.cpp
@@ -3,9 +3,7 @@
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
-#include <vespa/messagebus/testlib/custompolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
@@ -32,7 +30,7 @@ StringList::add(const string &str)
std::vector<string>::push_back(str); return *this;
}
-static const double GET_MESSAGE_TIMEOUT = 60.0;
+static const duration GET_MESSAGE_TIMEOUT = 60s;
////////////////////////////////////////////////////////////////////////////////
//
@@ -158,20 +156,20 @@ Test::testRetryTag(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
for (uint32_t i = 0; i < 5; ++i) {
EXPECT_EQUAL(i, msg->getRetry());
EXPECT_EQUAL(true, msg->getRetryEnabled());
replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0);
msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
}
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
printf("%s", reply->getTrace().toString().c_str());
}
@@ -183,14 +181,14 @@ Test::testRetryEnabledTag(TestData &data)
msg->setRetryEnabled(false);
EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted());
msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
EXPECT_EQUAL(false, msg->getRetryEnabled());
replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0);
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(reply->hasErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
printf("%s", reply->getTrace().toString().c_str());
}
@@ -200,16 +198,16 @@ Test::testTransientError(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0);
msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0);
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(reply->hasFatalErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
printf("%s", reply->getTrace().toString().c_str());
}
@@ -219,13 +217,13 @@ Test::testFatalError(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0);
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(reply->hasFatalErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
printf("%s", reply->getTrace().toString().c_str());
}
@@ -235,14 +233,14 @@ Test::testDisableRetry(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0);
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(reply->hasErrors());
EXPECT_TRUE(!reply->hasFatalErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
printf("%s", reply->getTrace().toString().c_str());
}
@@ -253,19 +251,19 @@ Test::testRetryDelay(TestData &data)
data._retryPolicy->setBaseDelay(0.01);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
for (uint32_t i = 0; i < 5; ++i) {
EXPECT_EQUAL(i, msg->getRetry());
replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, -1);
msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
}
replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0);
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(reply->hasFatalErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
string trace = reply->getTrace().toString();
EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos);
@@ -282,19 +280,19 @@ Test::testRequestRetryDelay(TestData &data)
data._retryPolicy->setBaseDelay(1);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
for (uint32_t i = 0; i < 5; ++i) {
EXPECT_EQUAL(i, msg->getRetry());
replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, i / 50.0);
msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
}
replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0);
Reply::UP reply = data._srcHandler.getReply();
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(reply->hasFatalErrors());
- msg = data._dstHandler.getMessage(0);
- EXPECT_TRUE(msg.get() == NULL);
+ msg = data._dstHandler.getMessageNow();
+ EXPECT_FALSE(msg);
string trace = reply->getTrace().toString();
EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos);
diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp
index a7a35508656..88db25fa9c7 100644
--- a/messagebus/src/tests/routable/routable.cpp
+++ b/messagebus/src/tests/routable/routable.cpp
@@ -73,7 +73,7 @@ Test::Main()
msg.pushHandler(handler);
msg.discard();
- Reply::UP reply = handler.getReply(0);
+ Reply::UP reply = handler.getReply(duration::zero());
ASSERT_FALSE(reply);
}
{
@@ -86,7 +86,7 @@ Test::Main()
reply.swapState(msg);
reply.discard();
- Reply::UP ap = handler.getReply(0);
+ Reply::UP ap = handler.getReply(duration::zero());
ASSERT_FALSE(ap);
}
diff --git a/messagebus/src/tests/routablequeue/routablequeue.cpp b/messagebus/src/tests/routablequeue/routablequeue.cpp
index 4e7c918134f..04e90984f88 100644
--- a/messagebus/src/tests/routablequeue/routablequeue.cpp
+++ b/messagebus/src/tests/routablequeue/routablequeue.cpp
@@ -40,8 +40,8 @@ Test::Main()
{
RoutableQueue rq;
EXPECT_TRUE(rq.size() == 0);
- EXPECT_TRUE(rq.dequeue(0).get() == 0);
- EXPECT_TRUE(rq.dequeue(100).get() == 0);
+ EXPECT_TRUE(rq.dequeue().get() == 0);
+ EXPECT_TRUE(rq.dequeue(100ms).get() == 0);
EXPECT_TRUE(TestMessage::getCnt() == 0);
EXPECT_TRUE(TestReply::getCnt() == 0);
rq.enqueue(Routable::UP(new TestMessage(101)));
@@ -61,16 +61,16 @@ Test::Main()
EXPECT_TRUE(TestMessage::getCnt() == 2);
EXPECT_TRUE(TestReply::getCnt() == 2);
{
- Routable::UP r = rq.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = rq.dequeue();
+ ASSERT_TRUE(r);
EXPECT_TRUE(rq.size() == 3);
EXPECT_TRUE(r->getType() == 101);
}
EXPECT_TRUE(TestMessage::getCnt() == 1);
EXPECT_TRUE(TestReply::getCnt() == 2);
{
- Routable::UP r = rq.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = rq.dequeue();
+ ASSERT_TRUE(r);
EXPECT_TRUE(rq.size() == 2);
EXPECT_TRUE(r->getType() == 201);
}
@@ -85,16 +85,16 @@ Test::Main()
EXPECT_TRUE(TestMessage::getCnt() == 2);
EXPECT_TRUE(TestReply::getCnt() == 2);
{
- Routable::UP r = rq.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = rq.dequeue();
+ ASSERT_TRUE(r);
EXPECT_TRUE(rq.size() == 3);
EXPECT_TRUE(r->getType() == 102);
}
EXPECT_TRUE(TestMessage::getCnt() == 1);
EXPECT_TRUE(TestReply::getCnt() == 2);
{
- Routable::UP r = rq.dequeue(0);
- ASSERT_TRUE(r.get() != 0);
+ Routable::UP r = rq.dequeue();
+ ASSERT_TRUE(r);
EXPECT_TRUE(rq.size() == 2);
EXPECT_TRUE(r->getType() == 202);
}
diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp
index 42c5938fe92..48c86a53160 100644
--- a/messagebus/src/tests/routing/routing.cpp
+++ b/messagebus/src/tests/routing/routing.cpp
@@ -8,7 +8,6 @@
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/component/vtag.h>
@@ -485,7 +484,7 @@ private:
bool testTrace(TestData &data, const std::vector<string> &expected);
bool testTrace(const std::vector<string> &expected, const Trace &trace);
- static const double RECEPTOR_TIMEOUT;
+ static const duration RECEPTOR_TIMEOUT;
public:
int Main() override;
@@ -540,7 +539,7 @@ public:
void requireThatDepthLimitCanBeIgnored(TestData &data);
};
-const double Test::RECEPTOR_TIMEOUT = 120.0;
+const duration Test::RECEPTOR_TIMEOUT = 120s;
TEST_APPHOOK(Test);
@@ -613,7 +612,7 @@ bool
Test::testAcknowledge(TestData &data)
{
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- if (!EXPECT_TRUE(msg.get() != NULL)) {
+ if (!EXPECT_TRUE(msg)) {
return false;
}
data._dstSession->acknowledge(std::move(msg));
@@ -632,7 +631,7 @@ bool
Test::testTrace(TestData &data, const std::vector<string> &expected)
{
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- if (!EXPECT_TRUE(reply.get() != NULL)) {
+ if (!EXPECT_TRUE(reply)) {
return false;
}
if (!EXPECT_TRUE(!reply->hasErrors())) {
@@ -747,7 +746,7 @@ Test::testNoRoutingTable(TestData &data)
EXPECT_TRUE(!res.isAccepted());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode());
Message::UP msg = res.getMessage();
- EXPECT_TRUE(msg.get() != NULL);
+ EXPECT_TRUE(msg);
}
void
@@ -759,7 +758,7 @@ Test::testUnknownRoute(TestData &data)
EXPECT_TRUE(!res.isAccepted());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode());
Message::UP msg = res.getMessage();
- EXPECT_TRUE(msg.get() != NULL);
+ EXPECT_TRUE(msg);
}
void
@@ -767,7 +766,7 @@ Test::testNoRoute(TestData &data)
{
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route()).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
}
@@ -779,10 +778,10 @@ Test::testRecognizeHopName(TestData &data)
.addHop(HopSpec("dst", "dst/session"))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
@@ -794,10 +793,10 @@ Test::testRecognizeRouteDirective(TestData &data)
.addHop(HopSpec("dir", "route:dst"))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dir")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
@@ -808,10 +807,10 @@ Test::testRecognizeRouteName(TestData &data)
.addRoute(RouteSpec("dst").addHop("dst/session"))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
@@ -823,7 +822,7 @@ Test::testHopResolutionOverflow(TestData &data)
.addHop(HopSpec("bar", "foo"))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
}
@@ -835,7 +834,7 @@ Test::testRouteResolutionOverflow(TestData &data)
.addRoute(RouteSpec("foo").addHop("route:foo"))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), "foo").isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
}
@@ -847,13 +846,13 @@ Test::testInsertRoute(TestData &data)
.addRoute(RouteSpec("foo").addHop("dst/session").addHop("bar"))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("route:foo baz")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
EXPECT_EQUAL(2u, msg->getRoute().getNumHops());
EXPECT_EQUAL("bar", msg->getRoute().getHop(0).toString());
EXPECT_EQUAL("baz", msg->getRoute().getHop(1).toString());
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
@@ -864,7 +863,7 @@ Test::testErrorDirective(TestData &data)
route.getHop(0).setDirective(1, IHopDirective::SP(new ErrorDirective("err")));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
EXPECT_EQUAL("err", reply->getError(0).getMessage());
@@ -879,7 +878,7 @@ Test::testSelectError(TestData &data)
data._srcServer.mb.putProtocol(protocol);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom: ]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
LOG(info, "testSelectError trace=%s", reply->getTrace().toString().c_str());
LOG(info, "testSelectError error=%s", reply->getError(0).toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
@@ -895,7 +894,7 @@ Test::testSelectNone(TestData &data)
data._srcServer.mb.putProtocol(protocol);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_SERVICES_FOR_ROUTE, reply->getError(0).getCode());
}
@@ -909,10 +908,10 @@ Test::testSelectOne(TestData &data)
data._srcServer.mb.putProtocol(protocol);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
@@ -922,22 +921,22 @@ Test::testResend1(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
reply.reset(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err2"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("[APP_TRANSIENT_ERROR @ localhost]: err1")
@@ -957,22 +956,22 @@ Test::testResend2(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
reply.reset(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err2"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Source session accepted a 3 byte message. 1 message(s) now pending.")
@@ -1022,13 +1021,13 @@ Test::testNoResend(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1"));
data._dstSession->reply(std::move(reply));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_TRANSIENT_ERROR, reply->getError(0).getCode());
}
@@ -1043,16 +1042,16 @@ Test::testSelectOnResend(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Selecting { 'dst/session' }.")
@@ -1075,16 +1074,16 @@ Test::testNoSelectOnResend(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Selecting { 'dst/session' }.")
@@ -1107,10 +1106,10 @@ Test::testCanConsumeError(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session,dst/unknown]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
EXPECT_TRUE(testTrace(StringList()
@@ -1131,7 +1130,7 @@ Test::testCantConsumeError(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/unknown]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
@@ -1152,10 +1151,10 @@ Test::testNestedPolicies(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
}
@@ -1173,10 +1172,10 @@ Test::testRemoveReply(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("[NO_ADDRESS_FOR_SERVICE @ localhost]")
@@ -1197,10 +1196,10 @@ Test::testSetReply(TestData &data)
data._retryPolicy->setEnabled(false);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Select:[SetReply:foo],dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode());
EXPECT_EQUAL("foo", reply->getError(0).getMessage());
@@ -1221,16 +1220,16 @@ Test::testResendSetAndReuseReply(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[ReuseReply:[SetReply:foo],dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "dst"));
data._dstSession->reply(std::move(reply));
msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
}
@@ -1250,10 +1249,10 @@ Test::testResendSetAndRemoveReply(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[RemoveReply:[SetReply:foo],dst/session]")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode());
EXPECT_EQUAL("foo", reply->getError(0).getMessage());
@@ -1271,13 +1270,13 @@ Test::testHopIgnoresReply(TestData &data)
{
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("?dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "dst"));
data._dstSession->reply(std::move(reply));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Not waiting for a reply from 'dst/session'."),
@@ -1291,13 +1290,13 @@ Test::testHopBlueprintIgnoresReply(TestData &data)
.addHop(HopSpec("foo", "dst/session").setIgnoreResult(true))));
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Reply::UP reply(new EmptyReply());
reply->swapState(*msg);
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "dst"));
data._dstSession->reply(std::move(reply));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Not waiting for a reply from 'dst/session'."),
@@ -1309,12 +1308,12 @@ Test::testAcceptEmptyRoute(TestData &data)
{
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
const Route &route = msg->getRoute();
EXPECT_EQUAL(0u, route.getNumHops());
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
}
void
@@ -1333,7 +1332,7 @@ Test::testAbortOnlyActiveNodes(TestData &data)
data._retryPolicy->setEnabled(true);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(2u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode());
EXPECT_EQUAL((uint32_t)ErrorCode::SEND_ABORTED, reply->getError(1).getCode());
@@ -1344,7 +1343,7 @@ Test::testUnknownPolicy(TestData &data)
{
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Unknown]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::UNKNOWN_POLICY, reply->getError(0).getCode());
}
@@ -1362,7 +1361,7 @@ Test::testSelectException(TestData &data)
Route::parse("[SelectException]"))
.isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR,
reply->getError(0).getCode());
@@ -1383,10 +1382,10 @@ Test::testMergeException(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route)
.isAccepted());
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR,
reply->getError(0).getCode());
@@ -1423,7 +1422,7 @@ Test::requireThatIgnoreFlagIsSerializedWithMessage(TestData &data)
{
ASSERT_TRUE(testSend(data, "dst/session foo ?bar"));
Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
Route route = msg->getRoute();
EXPECT_EQUAL(2u, route.getNumHops());
Hop hop = route.getHop(0);
@@ -1533,10 +1532,10 @@ Test::testTimeout(TestData &data)
{
data._retryPolicy->setEnabled(true);
data._retryPolicy->setBaseDelay(0.01);
- data._srcSession->setTimeout(0.5);
+ data._srcSession->setTimeout(500ms);
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/unknown")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(2u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(1).getCode());
diff --git a/messagebus/src/tests/routingcontext/routingcontext.cpp b/messagebus/src/tests/routingcontext/routingcontext.cpp
index fa0d8ed6536..1c971b29ee3 100644
--- a/messagebus/src/tests/routingcontext/routingcontext.cpp
+++ b/messagebus/src/tests/routingcontext/routingcontext.cpp
@@ -23,7 +23,7 @@ using namespace mbus;
using vespalib::make_string;
-static const double TIMEOUT = 120;
+static const duration TIMEOUT = 120s;
class StringList : public std::vector<string> {
public:
diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp
index c4fc0f908e0..aed4de7b228 100644
--- a/messagebus/src/tests/sendadapter/sendadapter.cpp
+++ b/messagebus/src/tests/sendadapter/sendadapter.cpp
@@ -59,7 +59,7 @@ public:
bool start();
};
-static const int TIMEOUT_SECS = 6;
+static const duration TIMEOUT_SECS = 6s;
TestData::TestData() :
_slobrok(),
@@ -121,7 +121,7 @@ testVersionedSend(TestData &data,
return false;
}
msg = data._itrHandler.getMessage(TIMEOUT_SECS);
- if (!EXPECT_TRUE(msg.get() != NULL)) {
+ if (!EXPECT_TRUE(msg)) {
return false;
}
LOG(info, "Message version %s serialized at source.",
@@ -138,7 +138,7 @@ testVersionedSend(TestData &data,
}
data._itrSession->forward(std::move(msg));
msg = data._dstHandler.getMessage(TIMEOUT_SECS);
- if (!EXPECT_TRUE(msg.get() != NULL)) {
+ if (!EXPECT_TRUE(msg)) {
return false;
}
LOG(info, "Message version %s serialized at intermediate.",
@@ -157,7 +157,7 @@ testVersionedSend(TestData &data,
reply->swapState(*msg);
data._dstSession->reply(std::move(reply));
reply = data._itrHandler.getReply();
- if (!EXPECT_TRUE(reply.get() != NULL)) {
+ if (!EXPECT_TRUE(reply)) {
return false;
}
LOG(info, "Reply version %s serialized at destination.",
@@ -173,7 +173,7 @@ testVersionedSend(TestData &data,
}
data._itrSession->forward(std::move(reply));
reply = data._srcHandler.getReply();
- if (!EXPECT_TRUE(reply.get() != NULL)) {
+ if (!EXPECT_TRUE(reply)) {
return false;
}
LOG(info, "Reply version %s serialized at intermediate.",
diff --git a/messagebus/src/tests/sequencer/sequencer.cpp b/messagebus/src/tests/sequencer/sequencer.cpp
index 218c1e43929..d347c3855cb 100644
--- a/messagebus/src/tests/sequencer/sequencer.cpp
+++ b/messagebus/src/tests/sequencer/sequencer.cpp
@@ -21,7 +21,7 @@ struct MyQueue : public RoutableQueue {
virtual ~MyQueue() {
while (size() > 0) {
- Routable::UP obj = dequeue(0);
+ Routable::UP obj = dequeue();
obj->getCallStack().discard();
}
}
@@ -31,14 +31,14 @@ struct MyQueue : public RoutableQueue {
LOG(error, "checkReply(): No reply in queue.");
return false;
}
- Routable::UP obj = dequeue(0);
+ Routable::UP obj = dequeue();
if (!obj->isReply()) {
LOG(error, "checkReply(): Got message when expecting reply.");
return false;
}
Reply::UP reply(static_cast<Reply*>(obj.release()));
Message::UP msg = reply->getMessage();
- if (msg.get() == NULL) {
+ if ( ! msg) {
LOG(error, "checkReply(): Reply has no message attached.");
return false;
}
@@ -64,7 +64,7 @@ struct MyQueue : public RoutableQueue {
}
void replyNext() {
- Routable::UP obj = dequeue(0);
+ Routable::UP obj = dequeue();
Message::UP msg(static_cast<Message*>(obj.release()));
Reply::UP reply(new EmptyReply());
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index 1706da3b55f..e415622707f 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -30,7 +30,7 @@ public:
}
};
-static const double TIMEOUT = 120;
+static const duration TIMEOUT = 120s;
TEST_APPHOOK(Test);
@@ -77,11 +77,11 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe()
SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams()
.setThrottlePolicy(IThrottlePolicy::SP())
.setReplyHandler(srcHandler));
- ASSERT_TRUE(srcSession.get() != NULL);
+ ASSERT_TRUE(srcSession);
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
ASSERT_TRUE(srcSession->send(std::move(msg), "dst/session", true).isAccepted());
msg = dstHandler.getMessage(TIMEOUT);
- ASSERT_TRUE(msg.get() != NULL);
+ ASSERT_TRUE(msg);
}
dstSession->acknowledge(std::move(msg));
}
diff --git a/messagebus/src/tests/sourcesession/sourcesession.cpp b/messagebus/src/tests/sourcesession/sourcesession.cpp
index 5177cf0e799..c793dd435c8 100644
--- a/messagebus/src/tests/sourcesession/sourcesession.cpp
+++ b/messagebus/src/tests/sourcesession/sourcesession.cpp
@@ -102,11 +102,11 @@ Test::testSequencing()
FastOS_Thread::Sleep(250);
EXPECT_TRUE(waitQueueSize(dstQ, 2));
EXPECT_TRUE(waitQueueSize(srcQ, 0));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
EXPECT_TRUE(waitQueueSize(srcQ, 2));
EXPECT_TRUE(waitQueueSize(dstQ, 1));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 3));
ASSERT_TRUE(waitQueueSize(dstQ, 0));
}
@@ -137,7 +137,7 @@ Test::testResendError()
}
EXPECT_TRUE(waitQueueSize(dstQ, 1));
{
- Routable::UP r = dstQ.dequeue(0);
+ Routable::UP r = dstQ.dequeue();
Reply::UP reply(new EmptyReply());
r->swapState(*reply);
reply->addError(Error(ErrorCode::FATAL_ERROR, "error"));
@@ -153,7 +153,7 @@ Test::testResendError()
}
EXPECT_TRUE(waitQueueSize(dstQ, 1));
{
- Routable::UP r = dstQ.dequeue(0);
+ Routable::UP r = dstQ.dequeue();
Reply::UP reply(new EmptyReply());
r->swapState(*reply);
reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "error"));
@@ -161,12 +161,12 @@ Test::testResendError()
}
EXPECT_TRUE(waitQueueSize(dstQ, 1));
EXPECT_TRUE(waitQueueSize(srcQ, 1));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 2));
ASSERT_TRUE(waitQueueSize(dstQ, 0));
{
- string trace1 = srcQ.dequeue(0)->getTrace().toString();
- string trace2 = srcQ.dequeue(0)->getTrace().toString();
+ string trace1 = srcQ.dequeue()->getTrace().toString();
+ string trace2 = srcQ.dequeue()->getTrace().toString();
fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace1.c_str());
fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace2.c_str());
}
@@ -202,7 +202,7 @@ Test::testResendConnDown()
msg->getTrace().setLevel(9);
EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dst2Q, 1));
- Routable::UP obj = dst2Q.dequeue(0);
+ Routable::UP obj = dst2Q.dequeue();
obj->discard();
src.mb.setupRouting(RoutingSpec().addTable(RoutingTableSpec(SimpleProtocol::NAME)
.addHop(HopSpec("dst", "dst/session"))));
@@ -210,11 +210,11 @@ Test::testResendConnDown()
ASSERT_TRUE(waitQueueSize(dstQ, 1)); // fails
ASSERT_TRUE(waitQueueSize(srcQ, 0));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 1));
ASSERT_TRUE(waitQueueSize(dstQ, 0));
- string trace = srcQ.dequeue(0)->getTrace().toString();
+ string trace = srcQ.dequeue()->getTrace().toString();
fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace.c_str());
}
@@ -240,7 +240,7 @@ Test::testIllegalRoute()
ASSERT_TRUE(waitQueueSize(srcQ, 1));
{
while (srcQ.size() > 0) {
- Routable::UP routable = srcQ.dequeue(0);
+ Routable::UP routable = srcQ.dequeue();
ASSERT_TRUE(routable->isReply());
Reply::UP r(static_cast<Reply*>(routable.release()));
EXPECT_EQUAL(1u, r->getNumErrors());
@@ -272,7 +272,7 @@ Test::testNoServices()
ASSERT_TRUE(waitQueueSize(srcQ, 1));
{
while (srcQ.size() > 0) {
- Routable::UP routable = srcQ.dequeue(0);
+ Routable::UP routable = srcQ.dequeue();
ASSERT_TRUE(routable->isReply());
Reply::UP r(static_cast<Reply*>(routable.release()));
EXPECT_TRUE(r->getNumErrors() == 1);
@@ -300,7 +300,7 @@ Test::testBlockingClose()
EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo")), "dst").isAccepted());
ss->close();
srcQ.handleMessage(Message::UP(new SimpleMessage("bogus")));
- Routable::UP routable = srcQ.dequeue(0);
+ Routable::UP routable = srcQ.dequeue();
EXPECT_TRUE(routable->isReply());
}
diff --git a/messagebus/src/tests/throttling/throttling.cpp b/messagebus/src/tests/throttling/throttling.cpp
index 5d3525e8ba6..6599604bf9a 100644
--- a/messagebus/src/tests/throttling/throttling.cpp
+++ b/messagebus/src/tests/throttling/throttling.cpp
@@ -3,7 +3,6 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/destinationsession.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/routablequeue.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/routing/routingspec.h>
@@ -12,7 +11,6 @@
#include <vespa/messagebus/staticthrottlepolicy.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/testserver.h>
@@ -141,15 +139,15 @@ Test::testMaxPendingCount()
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dstQ, 5));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 1));
EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted());
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dstQ, 5));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 3));
EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted());
@@ -158,11 +156,11 @@ Test::testMaxPendingCount()
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dstQ, 5));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 8));
ASSERT_TRUE(waitQueueSize(dstQ, 0));
}
@@ -202,17 +200,17 @@ Test::testMaxPendingSize()
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dstQ, 2));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 1));
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted());
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 2));
EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("12")), "dst").isAccepted());
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dstQ, 1));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 3));
}
@@ -244,7 +242,7 @@ Test::testMinOne()
EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("")), "dst").isAccepted());
EXPECT_TRUE(waitQueueSize(dstQ, 1));
- ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release()));
+ ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release()));
ASSERT_TRUE(waitQueueSize(srcQ, 1));
EXPECT_TRUE(waitQueueSize(dstQ, 0));
}
diff --git a/messagebus/src/tests/timeout/timeout.cpp b/messagebus/src/tests/timeout/timeout.cpp
index b2631e13d9c..980681a9d42 100644
--- a/messagebus/src/tests/timeout/timeout.cpp
+++ b/messagebus/src/tests/timeout/timeout.cpp
@@ -76,7 +76,7 @@ Test::testMessageExpires()
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());
- Message::UP msg = dstHandler.getMessage(1);
+ Message::UP msg = dstHandler.getMessage(1s);
if (msg) {
msg->discard();
}
diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h
index df25bf17973..11594fcdc22 100644
--- a/messagebus/src/vespa/messagebus/common.h
+++ b/messagebus/src/vespa/messagebus/common.h
@@ -2,17 +2,14 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
-#include <chrono>
+#include <vespa/vespalib/util/time.h>
namespace mbus {
// Decide the type of string used once
using string = vespalib::string;
-
-using seconds = std::chrono::duration<double>;
-using milliseconds = std::chrono::milliseconds;
-
-
+using duration = vespalib::duration;
+using time_point = vespalib::steady_clock::time_point;
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp
index c9fb03dd013..305ffb06aa0 100644
--- a/messagebus/src/vespa/messagebus/message.cpp
+++ b/messagebus/src/vespa/messagebus/message.cpp
@@ -10,7 +10,6 @@
#include <vespa/log/log.h>
LOG_SETUP(".message");
-using namespace std::chrono;
namespace mbus {
Message::Message() :
@@ -58,14 +57,14 @@ Message::swapState(Routable &rhs)
Message &
Message::setTimeReceivedNow()
{
- _timeReceived = steady_clock::now();
+ _timeReceived = vespalib::steady_clock::now();
return *this;
}
-milliseconds
+duration
Message::getTimeRemainingNow() const
{
- return std::max(milliseconds(0), _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived));
+ return std::max(0ns, _timeRemaining - (vespalib::steady_clock::now() - _timeReceived));
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h
index 3069e45e6d8..15e7384707c 100644
--- a/messagebus/src/vespa/messagebus/message.h
+++ b/messagebus/src/vespa/messagebus/message.h
@@ -3,7 +3,6 @@
#include "routable.h"
#include <vespa/messagebus/routing/route.h>
-#include <chrono>
namespace mbus {
@@ -55,7 +54,7 @@ public:
*
* @return The remaining time in milliseconds.
*/
- milliseconds getTimeRemaining() const { return _timeRemaining; }
+ duration getTimeRemaining() const { return _timeRemaining; }
/**
* Sets the numer of milliseconds that remain before this message times
@@ -65,7 +64,7 @@ public:
* @param timeRemaining The number of milliseconds until expiration.
* @return This, to allow chaining.
*/
- Message &setTimeRemaining(milliseconds timeRemaining) { _timeRemaining = timeRemaining; return *this; }
+ Message &setTimeRemaining(duration timeRemaining) { _timeRemaining = timeRemaining; return *this; }
/**
* Returns the number of milliseconds that remain right now before this
@@ -79,7 +78,7 @@ public:
*
* @return The remaining time in milliseconds.
*/
- milliseconds getTimeRemainingNow() const;
+ duration getTimeRemainingNow() const;
/**
* Access the route associated with this message.
@@ -185,7 +184,7 @@ public:
private:
Route _route;
time_point _timeReceived;
- milliseconds _timeRemaining;
+ duration _timeRemaining;
bool _retryEnabled;
uint32_t _retry;
};
diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h
index 037298cf7c0..1777b9e69f5 100644
--- a/messagebus/src/vespa/messagebus/network/inetwork.h
+++ b/messagebus/src/vespa/messagebus/network/inetwork.h
@@ -62,7 +62,7 @@ public:
* @param seconds The timeout.
* @return True if ready.
*/
- virtual bool waitUntilReady(seconds timeout) const = 0;
+ virtual bool waitUntilReady(duration timeout) const = 0;
/**
* Register a session name with the network layer. This will make the
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 5ae6b07c3fa..0bc7f9f3399 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -225,12 +225,12 @@ RPCNetwork::start()
}
bool
-RPCNetwork::waitUntilReady(seconds timeout) const
+RPCNetwork::waitUntilReady(duration timeout) const
{
slobrok::api::SlobrokList brokerList;
slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList);
bool hasConfig = false;
- for (uint32_t i = 0; i < timeout.count() * 100; ++i) {
+ for (int64_t i = 0; i < vespalib::count_ms(timeout)/10; ++i) {
if (configurator->poll()) {
hasConfig = true;
}
@@ -240,10 +240,10 @@ RPCNetwork::waitUntilReady(seconds timeout) const
std::this_thread::sleep_for(10ms);
}
if (! hasConfig) {
- LOG(error, "failed to get config for slobroks in %2.2f seconds", timeout.count());
+ LOG(error, "failed to get config for slobroks in %2.2f seconds", vespalib::to_s(timeout));
} else if (! _mirror->ready()) {
auto brokers = brokerList.logString();
- LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), timeout.count());
+ LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), vespalib::to_s(timeout));
}
return false;
}
@@ -320,7 +320,7 @@ void
RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients)
{
SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self
- seconds timeout = ctx._msg.getTimeRemainingNow();
+ duration timeout = ctx._msg.getTimeRemainingNow();
for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) {
RoutingNode *&recipient = ctx._recipients[i];
@@ -373,7 +373,7 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx)
make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.",
buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str()));
} else {
- std::chrono::milliseconds timeRemaining = ctx._msg.getTimeRemainingNow();
+ duration timeRemaining = ctx._msg.getTimeRemainingNow();
Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg);
RPCSendAdapter *adapter = getSendAdapter(ctx._version);
if (adapter == nullptr) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 169bdd86dd9..a6c2724929d 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -219,7 +219,7 @@ public:
void attach(INetworkOwner &owner) override;
const string getConnectionSpec() const override;
bool start() override;
- bool waitUntilReady(seconds timout) const override;
+ bool waitUntilReady(duration timout) const override;
void registerSession(const string &session) override;
void unregisterSession(const string &session) override;
bool allocServiceAddress(RoutingNode &recipient) override;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index e0fae8eabd6..2422638dc05 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -98,22 +98,22 @@ RPCSend::handleDiscard(Context ctx)
}
void
-RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, milliseconds timeRemaining)
+RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, duration timeRemaining)
{
send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
}
void
-RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, milliseconds timeRemaining)
+RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, duration timeRemaining)
{
send(recipient, version, FillByCopy(payload), timeRemaining);
}
void
RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & payload, milliseconds timeRemaining)
+ const PayLoadFiller & payload, duration timeRemaining)
{
- SendContext::UP ctx(new SendContext(recipient, timeRemaining));
+ auto ctx = std::make_unique<SendContext>(recipient, timeRemaining);
RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
const Message &msg = recipient.getMessage();
Route route = recipient.getRoute();
@@ -126,7 +126,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
version.toString().c_str(), _clientIdent.c_str(),
- address.getServiceName().c_str(), ctx->getTimeout().count()));
+ address.getServiceName().c_str(), vespalib::to_s(ctx->getTimeout())));
}
if (hop.getIgnoreResult()) {
@@ -135,13 +135,13 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
}
- Reply::UP reply(new EmptyReply());
+ auto reply = std::make_unique<EmptyReply>();
reply->getTrace().swap(ctx->getTrace());
_net->getOwner().deliverReply(std::move(reply), recipient);
} else {
SendContext *ptr = ctx.release();
req->SetContext(FNET_Context(ptr));
- address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout().count(), this);
+ address.getTarget().getFRTTarget().InvokeAsync(req, vespalib::to_s(ptr->getTimeout()), this);
}
}
@@ -159,12 +159,12 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) {
Error error;
Trace & trace = ctx->getTrace();
if (!req->CheckReturnTypes(getReturnSpec())) {
- reply.reset(new EmptyReply());
+ reply = std::make_unique<EmptyReply>();
switch (req->GetErrorCode()) {
case FRTE_RPC_TIMEOUT:
error = Error(ErrorCode::TIMEOUT,
make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
- serviceName.c_str(), ctx->getTimeout().count(), req->GetErrorMessage()));
+ serviceName.c_str(), vespalib::to_s(ctx->getTimeout()), req->GetErrorMessage()));
break;
case FRTE_RPC_CONNECTION:
error = Error(ErrorCode::CONNECTION_ERROR,
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index e7bf5495974..f3a9177d236 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -44,7 +44,7 @@ public:
virtual uint32_t getTraceLevel() const = 0;
virtual bool useRetry() const = 0;
virtual uint32_t getRetries() const = 0;
- virtual milliseconds getRemainingTime() const = 0;
+ virtual duration getRemainingTime() const = 0;
virtual vespalib::stringref getRoute() const = 0;
virtual vespalib::stringref getSession() const = 0;
virtual BlobRef getPayload() const = 0;
@@ -59,13 +59,13 @@ protected:
Error & error, vespalib::TraceNode & rootTrace) const = 0;
virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, milliseconds timeRemaining) const = 0;
+ const PayLoadFiller &filler, duration timeRemaining) const = 0;
virtual const char * getReturnSpec() const = 0;
virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0;
virtual std::unique_ptr<Params> toParams(const FRT_Values &param) const = 0;
void send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & filler, milliseconds timeRemaining);
+ const PayLoadFiller & filler, duration timeRemaining);
std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version,
BlobRef payload, Error & error) const;
/**
@@ -89,9 +89,9 @@ private:
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, milliseconds timeRemaining) final override;
+ Blob payload, duration timeRemaining) final override;
void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, milliseconds timeRemaining) final override;
+ BlobRef payload, duration timeRemaining) final override;
void RequestDone(FRT_RPCRequest *req) final override;
void handleReply(std::unique_ptr<Reply> reply) final override;
};
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
index 0b620b3b11f..0e299366e77 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
@@ -14,18 +14,18 @@ public:
using UP = std::unique_ptr<SendContext>;
SendContext(const SendContext &) = delete;
SendContext & operator = (const SendContext &) = delete;
- SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining)
+ SendContext(mbus::RoutingNode &recipient, duration timeRemaining)
: _recipient(recipient),
_trace(recipient.getTrace().getLevel()),
_timeout(timeRemaining)
{ }
mbus::RoutingNode &getRecipient() { return _recipient; }
mbus::Trace &getTrace() { return _trace; }
- seconds getTimeout() { return _timeout; }
+ duration getTimeout() { return _timeout; }
private:
mbus::RoutingNode &_recipient;
mbus::Trace _trace;
- seconds _timeout;
+ duration _timeout;
};
/**
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
index cc89bb022b9..15b63c3117c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
@@ -43,7 +43,7 @@ public:
* @param timeRemaining The time remaining until the message expires.
*/
virtual void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, milliseconds timeRemaining) = 0;
+ BlobRef payload, duration timeRemaining) = 0;
/**
* Performs the actual sending to the given recipient.
@@ -54,7 +54,7 @@ public:
* @param timeRemaining The time remaining until the message expires.
*/
virtual void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, milliseconds timeRemaining) = 0;
+ Blob payload, duration timeRemaining) = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
index e902aa20965..388ab3309c4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
@@ -4,7 +4,6 @@
#include "rpcnetwork.h"
#include "rpcserviceaddress.h"
#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/tracelevel.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/frt/reflection.h>
@@ -59,7 +58,7 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder)
void
RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, milliseconds timeRemaining) const
+ const PayLoadFiller &filler, duration timeRemaining) const
{
FRT_Values &args = *req.GetParams();
@@ -69,7 +68,7 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version,
args.AddString(address.getSessionName().c_str());
args.AddInt8(msg.getRetryEnabled() ? 1 : 0);
args.AddInt32(msg.getRetry());
- args.AddInt64(timeRemaining.count());
+ args.AddInt64(vespalib::count_ms(timeRemaining));
args.AddString(msg.getProtocol().c_str());
filler.fill(args);
args.AddInt32(traceLevel);
@@ -85,7 +84,7 @@ public:
uint32_t getTraceLevel() const override { return _args[8]._intval32; }
bool useRetry() const override { return _args[3]._intval8 != 0; }
uint32_t getRetries() const override { return _args[4]._intval32; }
- milliseconds getRemainingTime() const override { return milliseconds(_args[5]._intval64); }
+ duration getRemainingTime() const override { return std::chrono::milliseconds(_args[5]._intval64); }
vespalib::Version getVersion() const override {
return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len));
@@ -135,7 +134,7 @@ RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error
reply = decode(protocolName, version, payload, error);
}
if ( ! reply ) {
- reply.reset(new EmptyReply());
+ reply = std::make_unique<EmptyReply>();
}
reply->setRetryDelay(retryDelay);
for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
index 3265c304830..249acc50e0c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
@@ -14,7 +14,7 @@ private:
std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, milliseconds timeRemaining) const override;
+ const PayLoadFiller &filler, duration timeRemaining) const override;
std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
Error & error, vespalib::TraceNode & rootTrace) const override;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
index 3e453bb60eb..3b0c10500b9 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
@@ -101,7 +101,7 @@ private:
void
RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, milliseconds timeRemaining) const
+ const PayLoadFiller &filler, duration timeRemaining) const
{
FRT_Values &args = *req.GetParams();
req.SetMethodName(METHOD_NAME);
@@ -118,7 +118,7 @@ RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Rout
root.setString(SESSION_F, address.getSessionName());
root.setBool(USERETRY_F, msg.getRetryEnabled());
root.setLong(RETRY_F, msg.getRetry());
- root.setLong(TIMELEFT_F, timeRemaining.count());
+ root.setLong(TIMELEFT_F, vespalib::count_ms(timeRemaining));
root.setString(PROTOCOL_F, msg.getProtocol());
root.setLong(TRACELEVEL_F, traceLevel);
filler.fill(BLOB_F, root);
@@ -156,7 +156,7 @@ public:
uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); }
bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); }
uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); }
- milliseconds getRemainingTime() const override { return milliseconds(_slime.get()[TIMELEFT_F].asLong()); }
+ duration getRemainingTime() const override { return std::chrono::milliseconds(_slime.get()[TIMELEFT_F].asLong()); }
Version getVersion() const override {
return Version(_slime.get()[VERSION_F].asString().make_stringref());
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
index 939c37c81b2..c48aa90a9fb 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
@@ -14,7 +14,7 @@ private:
std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, milliseconds timeRemaining) const override;
+ const PayLoadFiller &filler, duration timeRemaining) const override;
std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
Error & error, vespalib::TraceNode & rootTrace) const override;
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index bb3bf1d172d..63470b6b707 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -23,7 +23,7 @@ RPCTarget::~RPCTarget()
}
void
-RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler)
+RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler)
{
bool hasVersion = false;
bool shouldInvoke = false;
@@ -47,7 +47,7 @@ RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler)
} else if (shouldInvoke) {
FRT_RPCRequest *req = _orb.AllocRPCRequest();
req->SetMethodName("mbus.getVersion");
- _target.InvokeAsync(req, timeout.count(), this);
+ _target.InvokeAsync(req, vespalib::to_s(timeout), this);
}
}
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index 9c089381de7..b6488f25cb7 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -88,7 +88,7 @@ public:
* @param timeout The timeout for the request in milliseconds.
* @param handler The handler to be called once the version is available.
*/
- void resolveVersion(seconds timeout, IVersionHandler &handler);
+ void resolveVersion(duration timeout, IVersionHandler &handler);
/**
* @return true if the FRT target is valid or has been invoked (which
diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp
index eb2d93c6688..121056044ba 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.cpp
+++ b/messagebus/src/vespa/messagebus/routablequeue.cpp
@@ -39,17 +39,17 @@ RoutableQueue::enqueue(Routable::UP r)
}
Routable::UP
-RoutableQueue::dequeue(uint32_t msTimeout)
+RoutableQueue::dequeue(duration msTimeout)
{
steady_clock::time_point startTime = steady_clock::now();
- uint64_t msLeft = msTimeout;
+ duration msLeft = msTimeout;
vespalib::MonitorGuard guard(_monitor);
- while (_queue.size() == 0 && msLeft > 0) {
+ while (_queue.size() == 0 && msLeft > duration::zero()) {
if (!guard.wait(msLeft) || _queue.size() > 0) {
break;
}
- uint64_t elapsed = duration_cast<milliseconds>(steady_clock::now() - startTime).count();
- msLeft = (elapsed > msTimeout) ? 0 : msTimeout - elapsed;
+ duration elapsed = (steady_clock::now() - startTime);
+ msLeft = (elapsed > msTimeout) ? duration::zero() : msTimeout - elapsed;
}
if (_queue.size() == 0) {
return Routable::UP();
diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h
index c0ed35dece8..153686b1669 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.h
+++ b/messagebus/src/vespa/messagebus/routablequeue.h
@@ -69,7 +69,8 @@ public:
* @return the dequeued routable
* @param msTimeout how long to wait if the queue is empty
**/
- Routable::UP dequeue(uint32_t msTimeout);
+ Routable::UP dequeue(duration timeout);
+ Routable::UP dequeue() { return dequeue(duration::zero());}
/**
* Handle a Message by enqueuing it.
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index 41fe01625ae..eece44a922a 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -9,8 +9,6 @@
using vespalib::make_string;
-using namespace std::chrono_literals;
-using namespace std::chrono;
namespace mbus {
@@ -75,7 +73,7 @@ SourceSession::send(Message::UP msg)
{
msg->setTimeReceivedNow();
if (msg->getTimeRemaining() == 0ms) {
- msg->setTimeRemaining(duration_cast<milliseconds>(_timeout));
+ msg->setTimeRemaining(_timeout);
}
{
vespalib::MonitorGuard guard(_monitor);
@@ -145,10 +143,10 @@ SourceSession::close()
}
SourceSession &
-SourceSession::setTimeout(double timeout)
+SourceSession::setTimeout(duration timeout)
{
vespalib::MonitorGuard guard(_monitor);
- _timeout = seconds(timeout);
+ _timeout = timeout;
return *this;
}
diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h
index 31ebec3555e..0992a3e377b 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.h
+++ b/messagebus/src/vespa/messagebus/sourcesession.h
@@ -27,7 +27,7 @@ private:
Sequencer _sequencer;
IReplyHandler &_replyHandler;
IThrottlePolicy::SP _throttlePolicy;
- seconds _timeout;
+ duration _timeout;
uint32_t _pendingCount;
bool _closed;
bool _done;
@@ -120,7 +120,7 @@ public:
* @param timeout The numer of seconds allowed.
* @return This, to allow chaining.
*/
- SourceSession &setTimeout(double timeout);
+ SourceSession &setTimeout(duration timeout);
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
index e7a99f2a1de..5b0c920f138 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
@@ -7,8 +7,8 @@ namespace mbus {
SourceSessionParams::SourceSessionParams() :
_replyHandler(nullptr),
- _throttlePolicy(new DynamicThrottlePolicy()),
- _timeout(180.0)
+ _throttlePolicy(std::make_shared<DynamicThrottlePolicy>()),
+ _timeout(180s)
{ }
IThrottlePolicy::SP
@@ -20,18 +20,12 @@ SourceSessionParams::getThrottlePolicy() const
SourceSessionParams &
SourceSessionParams::setThrottlePolicy(IThrottlePolicy::SP throttlePolicy)
{
- _throttlePolicy = throttlePolicy;
+ _throttlePolicy = std::move(throttlePolicy);
return *this;
}
-seconds
-SourceSessionParams::getTimeout() const
-{
- return _timeout;
-}
-
SourceSessionParams &
-SourceSessionParams::setTimeout(seconds timeout)
+SourceSessionParams::setTimeout(duration timeout)
{
_timeout = timeout;
return *this;
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h
index 9ee17280d40..588b13a1bd8 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.h
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h
@@ -3,7 +3,6 @@
#include "ireplyhandler.h"
#include "ithrottlepolicy.h"
-#include <chrono>
namespace mbus {
@@ -18,7 +17,7 @@ class SourceSessionParams {
private:
IReplyHandler *_replyHandler;
IThrottlePolicy::SP _throttlePolicy;
- seconds _timeout;
+ duration _timeout;
public:
/**
@@ -46,14 +45,14 @@ public:
*
* @return The total timeout parameter.
*/
- seconds getTimeout() const;
+ duration getTimeout() const { return _timeout; }
/**
* Returns the number of seconds a message can spend trying to succeed.
*
* @return The timeout in seconds.
*/
- SourceSessionParams &setTimeout(seconds timeout);
+ SourceSessionParams &setTimeout(duration timeout);
/**
* Returns whether or not a reply handler has been assigned to this.
diff --git a/messagebus/src/vespa/messagebus/steadytimer.cpp b/messagebus/src/vespa/messagebus/steadytimer.cpp
index f64c8f361cd..942a1f4f051 100644
--- a/messagebus/src/vespa/messagebus/steadytimer.cpp
+++ b/messagebus/src/vespa/messagebus/steadytimer.cpp
@@ -1,6 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "steadytimer.h"
-#include <chrono>
+#include <vespa/vespalib/util/time.h>
using namespace std::chrono;
@@ -9,7 +9,7 @@ namespace mbus {
uint64_t
SteadyTimer::getMilliTime() const
{
- return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
+ return vespalib::count_ms(steady_clock::now().time_since_epoch());
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
index f98a4be05c3..01d644bba09 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
@@ -26,14 +26,13 @@ Receptor::handleReply(Reply::UP reply)
}
Message::UP
-Receptor::getMessage(double maxWait)
+Receptor::getMessage(duration maxWait)
{
- int64_t ms = (int64_t)(maxWait * 1000);
steady_clock::time_point startTime = steady_clock::now();
vespalib::MonitorGuard guard(_mon);
while (_msg.get() == 0) {
- int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count();
- if (w <= 0 || !guard.wait(w)) {
+ duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime);
+ if (w <= duration::zero() || !guard.wait(w)) {
break;
}
}
@@ -41,14 +40,13 @@ Receptor::getMessage(double maxWait)
}
Reply::UP
-Receptor::getReply(double maxWait)
+Receptor::getReply(duration maxWait)
{
- int64_t ms = (int)(maxWait * 1000);
steady_clock::time_point startTime = steady_clock::now();
vespalib::MonitorGuard guard(_mon);
while (_reply.get() == 0) {
- int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count();
- if (w <= 0 || !guard.wait(w)) {
+ duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime);
+ if (w <= duration::zero() || !guard.wait(w)) {
break;
}
}
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h
index ffc637fa90d..1d98ac62cd2 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.h
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.h
@@ -25,8 +25,10 @@ public:
~Receptor();
void handleMessage(Message::UP msg) override;
void handleReply(Reply::UP reply) override;
- Message::UP getMessage(double maxWait = 120.0);
- Reply::UP getReply(double maxWait = 120.0);
+ Message::UP getMessage(duration maxWait = 120s);
+ Reply::UP getReply(duration maxWait = 120s);
+ Message::UP getMessageNow() { return getMessage(duration::zero()); }
+ Reply::UP getReplyNow() { return getReply(duration::zero()); }
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index bbd23d52c0b..fe5baab3e40 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -4,6 +4,7 @@
#include "slobrok.h"
#include "slobrokstate.h"
#include <vespa/vespalib/component/vtag.h>
+#include <thread>
namespace mbus {
diff --git a/messagebus_test/src/tests/error/cpp-client.cpp b/messagebus_test/src/tests/error/cpp-client.cpp
index f186be68d01..833d941da32 100644
--- a/messagebus_test/src/tests/error/cpp-client.cpp
+++ b/messagebus_test/src/tests/error/cpp-client.cpp
@@ -35,7 +35,7 @@ App::Main()
msg.reset(new SimpleMessage("test"));
msg->getTrace().setLevel(9);
ss->send(std::move(msg), "test");
- reply = src.getReply(600); // 10 minutes timeout
+ reply = src.getReply(600s); // 10 minutes timeout
if (reply.get() == 0) {
fprintf(stderr, "CPP-CLIENT: no reply\n");
} else {
diff --git a/messagebus_test/src/tests/trace/trace.cpp b/messagebus_test/src/tests/trace/trace.cpp
index a804bef6785..0d4c622a0df 100644
--- a/messagebus_test/src/tests/trace/trace.cpp
+++ b/messagebus_test/src/tests/trace/trace.cpp
@@ -103,8 +103,8 @@ Test::Main()
Message::UP msg(new SimpleMessage("test"));
msg->getTrace().setLevel(1);
ss->send(std::move(msg), "test");
- reply = src.getReply(10);
- if (reply.get() != NULL) {
+ reply = src.getReply(10s);
+ if (reply) {
reply->getTrace().getRoot().normalize();
// resending breaks the trace, so retry until it has expected form
if (!reply->hasErrors() && reply->getTrace().getRoot().encode() == expect.encode()) {
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index d882d17841e..bd76b559490 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -47,7 +47,7 @@ public:
close();
}
- document::BucketId createAndSendSampleDocument(uint32_t timeout);
+ document::BucketId createAndSendSampleDocument(vespalib::duration timeout);
std::string getNodes(const std::string& infoString);
void sendReply(int idx = -1,
@@ -96,7 +96,7 @@ public:
PutOperationTest::~PutOperationTest() = default;
document::BucketId
-PutOperationTest::createAndSendSampleDocument(uint32_t timeout) {
+PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) {
auto doc = std::make_shared<Document>(doc_type(), DocumentId("id:test:testdoctype1::"));
document::BucketId id = getExternalOperationHandler().getBucketId(doc->getId());
@@ -119,9 +119,11 @@ using RequirePrimaryWritten = bool;
}
+const vespalib::duration TIMEOUT = 180ms;
+
TEST_F(PutOperationTest, simple) {
setupDistributor(1, 1, "storage:1 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
"id:test:testdoctype1::, timestamp 100, size 45) => 0",
@@ -182,7 +184,7 @@ TEST_F(PutOperationTest, do_not_send_inline_split_if_not_configured) {
TEST_F(PutOperationTest, node_removed_on_reply) {
setupDistributor(2, 2, "storage:2 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
"id:test:testdoctype1::, timestamp 100, size 45) => 0,"
@@ -206,7 +208,7 @@ TEST_F(PutOperationTest, node_removed_on_reply) {
TEST_F(PutOperationTest, storage_failed) {
setupDistributor(2, 1, "storage:1 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
sendReply(-1, api::ReturnCode::INTERNAL_FAILURE);
@@ -334,7 +336,7 @@ TEST_F(PutOperationTest, do_not_revert_on_failure_after_early_return) {
TEST_F(PutOperationTest, revert_successful_copies_when_one_fails) {
setupDistributor(3, 4, "storage:4 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put => 0,Put => 2,Put => 1", _sender.getCommands(true));
@@ -359,7 +361,7 @@ TEST_F(PutOperationTest, no_revert_if_revert_disabled) {
SetUp();
setupDistributor(3, 4, "storage:4 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("Put => 0,Put => 2,Put => 1", _sender.getCommands(true));
@@ -404,7 +406,7 @@ TEST_F(PutOperationTest, do_not_send_CreateBucket_if_already_pending) {
TEST_F(PutOperationTest, no_storage_nodes) {
setupDistributor(2, 1, "storage:0 distributor:1");
- createAndSendSampleDocument(180);
+ createAndSendSampleDocument(TIMEOUT);
ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
"timestamp 100) ReturnCode(NOT_CONNECTED, "
"Can't store document: No storage nodes available)",
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 67ef3374633..788ac1960dd 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -266,7 +266,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
makeDocumentBucket(document::BucketId(0)), update, api::Timestamp(0)));
// Misc settings for checking that propagation works.
msg->getTrace().setLevel(6);
- msg->setTimeout(6789);
+ msg->setTimeout(6789ms);
msg->setPriority(99);
if (options._timestampToUpdate) {
msg->setOldTimestamp(options._timestampToUpdate);
@@ -517,7 +517,7 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
{
// Settings set in sendUpdate().
EXPECT_EQ(6, msg->getTrace().getLevel());
- EXPECT_EQ(6789, msg->getTimeout());
+ EXPECT_EQ(6789ms, msg->getTimeout());
EXPECT_EQ(99, msg->getPriority());
}
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index 3bb86eaebd9..5d7871376cb 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -47,7 +47,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil {
document::BucketId superBucket,
document::BucketId lastBucket,
uint32_t maxBuckets = 8,
- uint32_t timeoutMS = 500,
+ vespalib::duration timeout = 500ms,
bool visitInconsistentBuckets = false,
bool visitRemoves = false,
std::string libraryName = "dumpvisitor",
@@ -69,7 +69,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil {
cmd->setMaximumPendingReplyCount(VisitorOperationTest::MAX_PENDING);
cmd->setMaxBucketsPerVisitor(maxBuckets);
- cmd->setTimeout(timeoutMS);
+ cmd->setTimeout(timeout);
if (visitInconsistentBuckets) {
cmd->setVisitInconsistentBuckets();
}
@@ -178,7 +178,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState)
msg->addBucketToBeVisited(nullId);
msg->setFieldSet("[header]");
msg->setVisitRemoves();
- msg->setTimeout(1234);
+ msg->setTimeout(1234ms);
msg->getTrace().setLevel(7);
auto op = createOpWithDefaultConfig(std::move(msg));
@@ -203,7 +203,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState)
EXPECT_GT(cvc->getToTime(), 0);
EXPECT_EQ("[header]", cvc->getFieldSet());
EXPECT_TRUE(cvc->visitRemoves());
- EXPECT_EQ(1234, cvc->getTimeout());
+ EXPECT_EQ(1234ms, cvc->getTimeout());
EXPECT_EQ(7, cvc->getTrace().getLevel());
sendReply(*op);
@@ -285,7 +285,7 @@ TEST_F(VisitorOperationTest, no_resend_after_timeout_passed) {
addNodesToBucketDB(id, "0=1/1/1/t,1=1/1/1/t");
auto op = createOpWithDefaultConfig(
- createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20));
+ createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20ms));
op->start(_sender, framework::MilliSecTime(0));
@@ -331,7 +331,7 @@ TEST_F(VisitorOperationTest, user_single_bucket) {
userid,
nullId,
8,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -356,7 +356,7 @@ VisitorOperationTest::runVisitor(document::BucketId id,
id,
lastId,
maxBuckets,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -448,13 +448,7 @@ TEST_F(VisitorOperationTest, empty_buckets_visited_when_visiting_removes) {
addNodesToBucketDB(id, "0=0/0/0/1/2/t");
auto op = createOpWithDefaultConfig(
- createVisitorCommand("emptybucket",
- id,
- nullId,
- 8,
- 500,
- false,
- true));
+ createVisitorCommand("emptybucket", id, nullId, 8, 500ms, false, true));
op->start(_sender, framework::MilliSecTime(0));
@@ -534,7 +528,7 @@ TEST_F(VisitorOperationTest, timeout_does_not_override_critical_error) {
document::BucketId(16, 1),
nullId,
8,
- 500)); // ms timeout
+ 500ms)); // ms timeout
op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0,Visitor Create => 1",
@@ -607,7 +601,7 @@ TEST_F(VisitorOperationTest, bucket_high_bit_count) {
id,
nullId,
8,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -633,7 +627,7 @@ TEST_F(VisitorOperationTest, bucket_low_bit_count) {
id,
nullId,
8,
- 500,
+ 500ms,
false,
false,
"dumpvisitor",
@@ -829,7 +823,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) {
_sender.clear();
auto op = createOpWithConfig(
- createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500, true),
+ createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500ms, true),
VisitorOperation::Config(5, 4));
op->start(_sender, framework::MilliSecTime(0));
@@ -988,7 +982,7 @@ VisitorOperationTest::startOperationWith2StorageNodeVisitors(bool inconsistent)
id,
nullId,
8,
- 500,
+ 500ms,
inconsistent));
op->start(_sender, framework::MilliSecTime(0));
@@ -1040,13 +1034,13 @@ TEST_F(VisitorOperationTest, queue_timeout_is_factor_of_total_timeout) {
addNodesToBucketDB(id, "0=1/1/1/t,1=1/1/1/t");
auto op = createOpWithDefaultConfig(
- createVisitorCommand("foo", id, nullId, 8, 10000));
+ createVisitorCommand("foo", id, nullId, 8, 10000ms));
op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0));
- EXPECT_EQ(5000, cmd.getQueueTimeout());
+ EXPECT_EQ(5000ms, cmd.getQueueTimeout());
}
void
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 44cb92071a1..b46c0236150 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -856,7 +856,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(0);
- cmd->setTimeout(50);
+ cmd->setTimeout(50ms);
filestorHandler.schedule(cmd, 0);
}
@@ -865,7 +865,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3);
cmd->setAddress(*address);
cmd->setPriority(200);
- cmd->setTimeout(10000);
+ cmd->setTimeout(10000ms);
filestorHandler.schedule(cmd, 0);
}
diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp
index 16b43828120..bc52d7508dc 100644
--- a/storage/src/tests/storageserver/documentapiconvertertest.cpp
+++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp
@@ -195,7 +195,7 @@ TEST_F(DocumentApiConverterTest, create_visitor) {
EXPECT_EQ("myinstance", cmd->getInstanceId());
EXPECT_EQ("control-dest", cmd->getControlDestination());
EXPECT_EQ("data-dest", cmd->getDataDestination());
- EXPECT_EQ(123456u, cmd->getTimeout());
+ EXPECT_EQ(123456ms, cmd->getTimeout());
auto msg = toDocumentAPI<documentapi::CreateVisitorMessage>(*cmd);
EXPECT_EQ(defaultSpaceName, msg->getBucketSpace());
@@ -210,7 +210,7 @@ TEST_F(DocumentApiConverterTest, create_visitor_high_timeout) {
EXPECT_EQ("myinstance", cmd->getInstanceId());
EXPECT_EQ("control-dest", cmd->getControlDestination());
EXPECT_EQ("data-dest", cmd->getDataDestination());
- EXPECT_EQ(std::numeric_limits<int32_t>::max(), cmd->getTimeout());
+ EXPECT_EQ(std::numeric_limits<int32_t>::max(), vespalib::count_ms(cmd->getTimeout()));
}
TEST_F(DocumentApiConverterTest, create_visitor_reply_not_ready) {
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 30ad9b58e9f..178862d8393 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -267,7 +267,7 @@ TEST_F(MergeThrottlerTest, chain) {
}
auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123);
cmd->setPriority(7);
- cmd->setTimeout(54321);
+ cmd->setTimeout(54321ms);
StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
cmd->setAddress(address);
const uint16_t distributorIndex = 123;
@@ -306,7 +306,7 @@ TEST_F(MergeThrottlerTest, chain) {
// Ensure priority, cluster state version and timeout is correctly forwarded
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -332,7 +332,7 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
_topLinks[executorNode]->sendDown(fwd);
}
@@ -359,7 +359,7 @@ TEST_F(MergeThrottlerTest, chain) {
fwd = _bottomLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET);
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
auto reply = std::make_shared<MergeBucketReply>(dynamic_cast<const MergeBucketCommand&>(*fwd));
reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<"));
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index c2074c53dd7..f88b59f50a5 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -246,7 +246,7 @@ void StateManagerTest::mark_reported_node_state_up() {
void StateManagerTest::send_down_get_node_state_request(uint16_t controller_index) {
auto cmd = std::make_shared<api::GetNodeStateCommand>(
std::make_unique<NodeState>(NodeType::STORAGE, State::UP));
- cmd->setTimeout(10000000);
+ cmd->setTimeout(10000000ms);
cmd->setSourceIndex(controller_index);
_upper->sendDown(cmd);
}
@@ -320,7 +320,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat
force_current_cluster_state_version(12345);
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
- cmd->setTimeout(10000000);
+ cmd->setTimeout(10000000ms);
cmd->setSourceIndex(0);
_upper->sendDown(cmd);
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp
index c152e4c5191..359a242ff5d 100644
--- a/storage/src/tests/visiting/commandqueuetest.cpp
+++ b/storage/src/tests/visiting/commandqueuetest.cpp
@@ -16,14 +16,13 @@ namespace storage {
namespace {
std::shared_ptr<api::CreateVisitorCommand> getCommand(
- vespalib::stringref name, int timeout,
+ vespalib::stringref name, vespalib::duration timeout,
uint8_t priority = 0)
{
vespalib::asciistream ost;
- ost << name << " t=" << timeout << " p=" << static_cast<unsigned int>(priority);
+ ost << name << " t=" << vespalib::count_ms(timeout) << " p=" << static_cast<unsigned int>(priority);
// Piggyback name in document selection
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "", "", ost.str()));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "", "", ost.str());
cmd->setQueueTimeout(timeout);
cmd->setPriority(priority);
return cmd;
@@ -43,13 +42,13 @@ TEST(CommandQueueTest, fifo) {
ASSERT_TRUE(queue.empty());
// Use all default priorities, meaning what comes out should be in the same order
// as what went in
- queue.add(getCommand("first", 1));
- queue.add(getCommand("second", 10));
- queue.add(getCommand("third", 5));
- queue.add(getCommand("fourth", 0));
- queue.add(getCommand("fifth", 3));
- queue.add(getCommand("sixth", 14));
- queue.add(getCommand("seventh", 7));
+ queue.add(getCommand("first", 1ms));
+ queue.add(getCommand("second", 10ms));
+ queue.add(getCommand("third", 5ms));
+ queue.add(getCommand("fourth", 0ms));
+ queue.add(getCommand("fifth", 3ms));
+ queue.add(getCommand("sixth", 14ms));
+ queue.add(getCommand("seventh", 7ms));
ASSERT_FALSE(queue.empty());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
@@ -74,16 +73,16 @@ TEST(CommandQueueTest, fifo_with_priorities) {
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 1, 10));
+ queue.add(getCommand("first", 1ms, 10));
EXPECT_EQ("first t=1 p=10", getCommandString(queue.peekLowestPriorityCommand()));
- queue.add(getCommand("second", 10, 22));
- queue.add(getCommand("third", 5, 9));
+ queue.add(getCommand("second", 10ms, 22));
+ queue.add(getCommand("third", 5ms, 9));
EXPECT_EQ("second t=10 p=22", getCommandString(queue.peekLowestPriorityCommand()));
- queue.add(getCommand("fourth", 0, 22));
- queue.add(getCommand("fifth", 3, 22));
+ queue.add(getCommand("fourth", 0ms, 22));
+ queue.add(getCommand("fifth", 3ms, 22));
EXPECT_EQ("fifth t=3 p=22", getCommandString(queue.peekLowestPriorityCommand()));
- queue.add(getCommand("sixth", 14, 50));
- queue.add(getCommand("seventh", 7, 0));
+ queue.add(getCommand("sixth", 14ms, 50));
+ queue.add(getCommand("seventh", 7ms, 0));
EXPECT_EQ("sixth t=14 p=50", getCommandString(queue.peekLowestPriorityCommand()));
@@ -111,19 +110,19 @@ TEST(CommandQueueTest, release_oldest) {
framework::defaultimplementation::FakeClock clock(framework::defaultimplementation::FakeClock::FAKE_ABSOLUTE);
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 10));
- queue.add(getCommand("second", 100));
- queue.add(getCommand("third", 1000));
- queue.add(getCommand("fourth", 5));
- queue.add(getCommand("fifth", 3000));
- queue.add(getCommand("sixth", 400));
- queue.add(getCommand("seventh", 700));
+ queue.add(getCommand("first", 10ms));
+ queue.add(getCommand("second", 100ms));
+ queue.add(getCommand("third", 1000ms));
+ queue.add(getCommand("fourth", 5ms));
+ queue.add(getCommand("fifth", 3000ms));
+ queue.add(getCommand("sixth", 400ms));
+ queue.add(getCommand("seventh", 700ms));
ASSERT_EQ(7u, queue.size());
using CommandEntry = CommandQueue<api::CreateVisitorCommand>::CommandEntry;
std::list<CommandEntry> timedOut(queue.releaseTimedOut());
ASSERT_TRUE(timedOut.empty());
- clock.addMilliSecondsToTime(400 * 1000);
+ clock.addMilliSecondsToTime(400);
timedOut = queue.releaseTimedOut();
ASSERT_EQ(4, timedOut.size());
std::ostringstream ost;
@@ -144,13 +143,13 @@ TEST(CommandQueueTest, release_lowest_priority) {
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 1, 10));
- queue.add(getCommand("second", 10, 22));
- queue.add(getCommand("third", 5, 9));
- queue.add(getCommand("fourth", 0, 22));
- queue.add(getCommand("fifth", 3, 22));
- queue.add(getCommand("sixth", 14, 50));
- queue.add(getCommand("seventh", 7, 0));
+ queue.add(getCommand("first", 1ms, 10));
+ queue.add(getCommand("second", 10ms, 22));
+ queue.add(getCommand("third", 5ms, 9));
+ queue.add(getCommand("fourth", 0ms, 22));
+ queue.add(getCommand("fifth", 3ms, 22));
+ queue.add(getCommand("sixth", 14ms, 50));
+ queue.add(getCommand("seventh", 7ms, 0));
ASSERT_EQ(7u, queue.size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
@@ -177,13 +176,13 @@ TEST(CommandQueueTest, delete_iterator) {
framework::defaultimplementation::FakeClock clock;
CommandQueue<api::CreateVisitorCommand> queue(clock);
ASSERT_TRUE(queue.empty());
- queue.add(getCommand("first", 10));
- queue.add(getCommand("second", 100));
- queue.add(getCommand("third", 1000));
- queue.add(getCommand("fourth", 5));
- queue.add(getCommand("fifth", 3000));
- queue.add(getCommand("sixth", 400));
- queue.add(getCommand("seventh", 700));
+ queue.add(getCommand("first", 10ms));
+ queue.add(getCommand("second", 100ms));
+ queue.add(getCommand("third", 1000ms));
+ queue.add(getCommand("fourth", 5ms));
+ queue.add(getCommand("fifth", 3000ms));
+ queue.add(getCommand("sixth", 400ms));
+ queue.add(getCommand("seventh", 700ms));
ASSERT_EQ(7u, queue.size());
CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin();
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 1275372b73b..b7eb7fee3ec 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -617,7 +617,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "InvalidVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
_top->waitForMessages(i+1, 60);
}
@@ -629,7 +629,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
}
@@ -698,7 +698,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
}
@@ -730,7 +730,7 @@ TEST_F(VisitorManagerTest, abort_on_failed_visitor_info) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
}
@@ -765,7 +765,7 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) {
makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval{bogus} == 1234");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
+ cmd->setQueueTimeout(0ms);
_top->sendDown(cmd);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ILLEGAL_PARAMETERS));
@@ -782,8 +782,8 @@ TEST_F(VisitorManagerTest, visitor_queue_timeout) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(1);
- cmd->setTimeout(100 * 1000 * 1000);
+ cmd->setQueueTimeout(1ms);
+ cmd->setTimeout(100 * 1000 * 1000ms);
_top->sendDown(cmd);
_node->getClock().addSecondsToTime(1000);
@@ -807,8 +807,8 @@ TEST_F(VisitorManagerTest, visitor_processing_timeout) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
- cmd->setQueueTimeout(0);
- cmd->setTimeout(100);
+ cmd->setQueueTimeout(0ms);
+ cmd->setTimeout(100ms);
_top->sendDown(cmd);
// Wait for Put before increasing the clock
@@ -825,7 +825,7 @@ namespace {
uint32_t nextVisitor = 0;
api::StorageMessage::Id
-sendCreateVisitor(uint32_t timeout, DummyStorageLink& top, uint8_t priority = 127) {
+sendCreateVisitor(vespalib::duration timeout, DummyStorageLink& top, uint8_t priority = 127) {
std::ostringstream ost;
ost << "testvis" << ++nextVisitor;
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
@@ -851,25 +851,25 @@ TEST_F(VisitorManagerTest, prioritized_visitor_queing) {
// First 4 should just start..
for (uint32_t i = 0; i < 4; ++i) {
- ids[i] = sendCreateVisitor(i, *_top, i);
+ ids[i] = sendCreateVisitor(i*1ms, *_top, i);
}
// Next ones should be queued - (Better not finish before we get here)
// Submit with higher priorities
for (uint32_t i = 0; i < 4; ++i) {
- ids[i + 4] = sendCreateVisitor(1000, *_top, 100 - i);
+ ids[i + 4] = sendCreateVisitor(1000ms, *_top, 100 - i);
}
// Queue is now full with a pri 100 visitor at its end
// Send a lower pri visitor that will be busy-returned immediately
- ids[8] = sendCreateVisitor(1000, *_top, 130);
+ ids[8] = sendCreateVisitor(1000ms, *_top, 130);
uint64_t message_id = 0;
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[8], message_id);
// Send a higher pri visitor that will take the place of pri 100 visitor
- ids[9] = sendCreateVisitor(1000, *_top, 60);
+ ids[9] = sendCreateVisitor(1000ms, *_top, 60);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[4], message_id);
@@ -917,44 +917,44 @@ TEST_F(VisitorManagerTest, prioritized_max_concurrent_visitors) {
// First 4 should just start..
for (uint32_t i = 0; i < 4; ++i) {
- ids[i] = sendCreateVisitor(i, *_top, i);
+ ids[i] = sendCreateVisitor(i*1ms, *_top, i);
}
// Low pri messages; get put into queue
for (uint32_t i = 0; i < 6; ++i) {
- ids[i + 4] = sendCreateVisitor(1000, *_top, 203 - i);
+ ids[i + 4] = sendCreateVisitor(1000ms, *_top, 203 - i);
}
// Higher pri message: fits happily into 1 extra concurrent slot
- ids[10] = sendCreateVisitor(1000, *_top, 190);
+ ids[10] = sendCreateVisitor(1000ms, *_top, 190);
// Should punch pri203 msg out of the queue -> busy
- ids[11] = sendCreateVisitor(1000, *_top, 197);
+ ids[11] = sendCreateVisitor(1000ms, *_top, 197);
uint64_t message_id = 0;
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[4], message_id);
// No concurrency slots left for this message -> busy
- ids[12] = sendCreateVisitor(1000, *_top, 204);
+ ids[12] = sendCreateVisitor(1000ms, *_top, 204);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[12], message_id);
// Gets a concurrent slot
- ids[13] = sendCreateVisitor(1000, *_top, 80);
+ ids[13] = sendCreateVisitor(1000ms, *_top, 80);
// Kicks pri 202 out of the queue -> busy
- ids[14] = sendCreateVisitor(1000, *_top, 79);
+ ids[14] = sendCreateVisitor(1000ms, *_top, 79);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id));
ASSERT_EQ(ids[5], message_id);
// Gets a concurrent slot
- ids[15] = sendCreateVisitor(1000, *_top, 63);
+ ids[15] = sendCreateVisitor(1000ms, *_top, 63);
// Very Important Visitor(tm) gets a concurrent slot
- ids[16] = sendCreateVisitor(1000, *_top, 0);
+ ids[16] = sendCreateVisitor(1000ms, *_top, 0);
std::vector<document::Document::SP> docs;
std::vector<document::DocumentId> docIds;
@@ -1018,11 +1018,11 @@ TEST_F(VisitorManagerTest, visitor_queing_zero_queue_size) {
// First 4 should just start..
for (uint32_t i = 0; i < 4; ++i) {
- sendCreateVisitor(i, *_top, i);
+ sendCreateVisitor(i * 1ms, *_top, i);
}
// Queue size is zero, all visitors will be busy-returned
for (uint32_t i = 0; i < 5; ++i) {
- sendCreateVisitor(1000, *_top, 100 - i);
+ sendCreateVisitor(1000ms, *_top, 100 - i);
ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY));
}
for (uint32_t session = 0; session < 4; ++session) {
@@ -1037,8 +1037,8 @@ TEST_F(VisitorManagerTest, status_page) {
_manager->setMaxConcurrentVisitors(1, 1);
_manager->setMaxVisitorQueueSize(6);
// 1 running, 1 queued
- sendCreateVisitor(1000000, *_top, 1);
- sendCreateVisitor(1000000, *_top, 128);
+ sendCreateVisitor(1000000ms, *_top, 1);
+ sendCreateVisitor(1000000ms, *_top, 128);
{
TestVisitorMessageSession& session = getSession(0);
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index 3aadce4d18e..6836c738b3a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -76,9 +76,7 @@ VisitorOperation::VisitorOperation(
}
}
-VisitorOperation::~VisitorOperation()
-{
-}
+VisitorOperation::~VisitorOperation() = default;
document::BucketId
VisitorOperation::getLastBucketVisited()
@@ -121,22 +119,21 @@ VisitorOperation::getLastBucketVisited()
return newLastBucket;
}
-uint64_t
+vespalib::duration
VisitorOperation::timeLeft() const noexcept
{
const auto elapsed = _operationTimer.getElapsedTime();
- framework::MilliSecTime timeSpent(
- std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
+
LOG(spam,
- "Checking if visitor has timed out: elapsed=%" PRIu64 " ms, timeout=%u ms",
- timeSpent.getTime(),
- _msg->getTimeout());
+ "Checking if visitor has timed out: elapsed=%ld ms, timeout=%ld ms",
+ vespalib::count_ms(elapsed),
+ vespalib::count_ms(_msg->getTimeout()));
- if (timeSpent.getTime() >= _msg->getTimeout()) {
- return 0;
+ if (elapsed >= _msg->getTimeout()) {
+ return vespalib::duration::zero();
} else {
- return _msg->getTimeout() - timeSpent.getTime();
+ return _msg->getTimeout() - elapsed;
}
}
@@ -581,7 +578,7 @@ VisitorOperation::onStart(DistributorMessageSender& sender)
bool
VisitorOperation::shouldAbortDueToTimeout() const noexcept
{
- return timeLeft() == 0;
+ return timeLeft() <= vespalib::duration::zero();
}
void
@@ -629,8 +626,8 @@ VisitorOperation::startNewVisitors(DistributorMessageSender& sender)
markOperationAsFailed(
api::ReturnCode(api::ReturnCode::ABORTED,
vespalib::make_string(
- "Timeout of %u ms is running out",
- _msg->getTimeout())));
+ "Timeout of %ld ms is running out",
+ vespalib::count_ms(_msg->getTimeout()))));
}
if (maySendNewStorageVisitors()) {
@@ -782,7 +779,7 @@ VisitorOperation::sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap,
return visitorsSent;
}
-uint32_t
+vespalib::duration
VisitorOperation::computeVisitorQueueTimeoutMs() const noexcept
{
return timeLeft() / 2;
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index ebb5ed4c6aa..fdfe60731f5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -108,17 +108,15 @@ private:
bool allowInconsistencies() const noexcept;
bool shouldAbortDueToTimeout() const noexcept;
bool assignBucketsToNodes(NodeToBucketsMap& nodeToBucketsMap);
- int getNumVisitorsToSendForNode(uint16_t node,
- uint32_t totalBucketsOnNode) const;
- uint32_t computeVisitorQueueTimeoutMs() const noexcept;
+ int getNumVisitorsToSendForNode(uint16_t node, uint32_t totalBucketsOnNode) const;
+ vespalib::duration computeVisitorQueueTimeoutMs() const noexcept;
bool sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap,
DistributorMessageSender& sender);
void sendStorageVisitor(uint16_t node,
const std::vector<document::BucketId>& buckets,
uint32_t pending,
DistributorMessageSender& sender);
- void markCompleted(const document::BucketId& bid,
- const api::ReturnCode& code);
+ void markCompleted(const document::BucketId& bid, const api::ReturnCode& code);
/**
* Operation failed and we can pin the blame on a specific node. Updates
* internal error code and augments error message with the index of the
@@ -138,7 +136,7 @@ private:
* time point. In case of the current time having passed the timeout
* point, function returns 0.
*/
- uint64_t timeLeft() const noexcept;
+ vespalib::duration timeLeft() const noexcept;
DistributorComponent& _owner;
DistributorBucketSpace &_bucketSpace;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 52a4a5c195c..130e039a43e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -77,7 +77,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
std::shared_ptr<api::JoinBucketsCommand> msg(
new api::JoinBucketsCommand(getBucket()));
msg->getSourceBuckets() = node.second;
- msg->setTimeout(INT_MAX);
+ msg->setTimeout(vespalib::duration::max());
setCommandMeta(*msg);
_tracker.queueCommand(msg, node.first);
}
@@ -90,8 +90,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
api::JoinBucketsReply& rep = static_cast<api::JoinBucketsReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
if (node == 0xffff) {
- LOG(debug, "Ignored reply since node was max uint16_t for unknown "
- "reasons");
+ LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
return;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 66ce4fc0485..445d0972937 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -155,7 +155,7 @@ MergeOperation::onStart(DistributorMessageSender& sender)
// Set timeout to one hour to prevent hung nodes that manage to keep
// connections open from stalling merges in the cluster indefinitely.
- msg->setTimeout(60 * 60 * 1000);
+ msg->setTimeout(3600s);
setCommandMeta(*msg);
sender.sendToNode(lib::NodeType::STORAGE, _mnodes[0].index, msg);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 1b40f744a80..57f8bc92316 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -20,7 +20,7 @@ SplitOperation::SplitOperation(const std::string& clusterName, const BucketAndNo
_splitCount(splitCount),
_splitSize(splitSize)
{}
-SplitOperation::~SplitOperation() {}
+SplitOperation::~SplitOperation() = default;
void
SplitOperation::onStart(DistributorMessageSender& sender)
@@ -35,7 +35,7 @@ SplitOperation::onStart(DistributorMessageSender& sender)
msg->setMaxSplitBits(_maxBits);
msg->setMinDocCount(_splitCount);
msg->setMinByteSize(_splitSize);
- msg->setTimeout(INT_MAX);
+ msg->setTimeout(vespalib::duration::max());
setCommandMeta(*msg);
_tracker.queueCommand(msg, entry->getNodeRef(i).getNode());
_ok = true;
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 8298b126690..62a520abc87 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -230,7 +230,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
distributionHash));
cmd->setPriority(api::StorageMessage::HIGH);
- cmd->setTimeout(INT_MAX);
+ cmd->setTimeout(vespalib::duration::max());
_sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index d1631c50880..f773ee774bb 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -355,7 +355,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
}
bool
-FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime)
+FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, vespalib::duration waitTime)
{
if (msg.getType().isReply()) {
return false; // Replies must always be processed and cannot time out.
@@ -980,7 +980,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
return lck;
}
- uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
+ std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])));
if (!messageTimedOutInQueue(m, waitTime)) {
std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command);
@@ -1004,7 +1004,7 @@ FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
api::StorageMessage & m(*iter->_command);
- uint64_t waitTime(iter->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
+ std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])));
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
document::Bucket bucket(iter->_bucket);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index fd6ab5e8b9a..5fc592e11cb 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -325,7 +325,7 @@ private:
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
*/
- static bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime);
+ static bool messageTimedOutInQueue(const api::StorageMessage& msg, vespalib::duration waitTime);
/**
* Creates and returns a reply with api::TIMEOUT return code for msg.
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index 24f4c9cd731..1efb42a7b22 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -16,7 +16,7 @@ MergeStatus::MergeStatus(framework::Clock& clock, const metrics::LoadType& lt,
context(lt, priority, traceLevel)
{}
-MergeStatus::~MergeStatus() {}
+MergeStatus::~MergeStatus() = default;
bool
MergeStatus::removeFromDiff(
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index 932859cd2d0..082ae053ec0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -24,7 +24,7 @@ public:
api::StorageMessage::Id pendingId;
std::shared_ptr<api::GetBucketDiffReply> pendingGetDiff;
std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff;
- uint32_t timeout;
+ vespalib::duration timeout;
framework::MilliSecTimer startTime;
spi::Context context;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 499d7ce15ac..978d434847e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -148,7 +148,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
//TODO: Can it be moved ?
std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand();
- cmd->setTimeout(storMsgPtr->getTimeRemaining().count());
+ cmd->setTimeout(storMsgPtr->getTimeRemaining());
cmd->setTrace(storMsgPtr->getTrace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
@@ -567,7 +567,7 @@ CommunicationManager::sendCommand(
cmd->setContext(mbus::Context(msg->getMsgId()));
cmd->setRetryEnabled(address.retryEnabled());
- cmd->setTimeRemaining(std::chrono::milliseconds(msg->getTimeout()));
+ cmd->setTimeRemaining(msg->getTimeout());
cmd->setTrace(msg->getTrace());
sendMessageBusMessage(msg, std::move(cmd), address.getRoute());
break;
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
index c6a16de3282..f0c987ee333 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
@@ -20,8 +20,6 @@ LOG_SETUP(".documentapiconverter");
using document::BucketSpace;
-using std::chrono::milliseconds;
-
namespace storage {
DocumentApiConverter::DocumentApiConverter(const config::ConfigUri &configUri,
@@ -140,9 +138,12 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg)
break;
}
- if (toMsg.get() != 0) {
- milliseconds timeout = std::min(milliseconds(INT_MAX), fromMsg.getTimeRemaining());
- toMsg->setTimeout(timeout.count());
+ if (toMsg) {
+ //TODO getTimeRemainingNow ?
+ vespalib::duration cappedTimeout = (fromMsg.getTimeRemaining() < 1ms*INT_MAX)
+ ? fromMsg.getTimeRemaining()
+ : 1ms*INT_MAX;
+ toMsg->setTimeout(cappedTimeout);
toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority()));
toMsg->setLoadType(fromMsg.getLoadType());
@@ -308,8 +309,8 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg)
break;
}
- if (toMsg.get()) {
- toMsg->setTimeRemaining(milliseconds(fromMsg.getTimeout()));
+ if (toMsg) {
+ toMsg->setTimeRemaining(fromMsg.getTimeout());
toMsg->setContext(mbus::Context(fromMsg.getMsgId()));
if (LOG_WOULD_LOG(spam)) {
toMsg->getTrace().setLevel(9);
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index c86e1671033..45bd9c64fac 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -152,7 +152,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req)
: std::unique_ptr<lib::NodeState>()));
cmd->setPriority(api::StorageMessage::VERYHIGH);
- cmd->setTimeout(req->GetParams()->GetValue(1)._intval32);
+ cmd->setTimeout(std::chrono::milliseconds(req->GetParams()->GetValue(1)._intval32));
if (req->GetParams()->GetNumValues() > 2) {
cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32);
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index af01a880fea..9afc8b2d3a5 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -460,14 +460,15 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd)
&& (*cmd->getExpectedState() == *_nodeState || sentReply)
&& is_up_to_date)
{
+ int64_t msTimeout = vespalib::count_ms(cmd->getTimeout());
LOG(debug, "Received get node state request with timeout of "
- "%u milliseconds. Scheduling to be answered in "
- "%u milliseconds unless a node state change "
+ "%ld milliseconds. Scheduling to be answered in "
+ "%ld milliseconds unless a node state change "
"happens before that time.",
- cmd->getTimeout(), cmd->getTimeout() * 800 / 1000);
+ msTimeout, msTimeout * 800 / 1000);
TimeStatePair pair(
_component.getClock().getTimeInMillis()
- + framework::MilliSecTime(cmd->getTimeout() * 800 / 1000),
+ + framework::MilliSecTime(msTimeout * 800 / 1000),
cmd);
_queuedStateRequests.emplace_back(std::move(pair));
} else {
diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h
index d129506eb64..ce309d55803 100644
--- a/storage/src/vespa/storage/visiting/commandqueue.h
+++ b/storage/src/vespa/storage/visiting/commandqueue.h
@@ -16,6 +16,7 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <vespa/vespalib/util/printable.h>
+#include <vespa/vespalib//util/time.h>
#include <vespa/fastos/timestamp.h>
#include <vespa/storageframework/generic/clock/clock.h>
#include <list>
@@ -141,11 +142,10 @@ CommandQueue<Command>::peekNextCommand() const
template<class Command>
void
-CommandQueue<Command>::add(
- const std::shared_ptr<Command>& cmd)
+CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd)
{
framework::MicroSecTime time(_clock.getTimeInMicros()
- + framework::MicroSecTime(cmd->getQueueTimeout() * 1000000));
+ + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout())));
_commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority()));
}
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 6330b580eb9..d4a176fa14b 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -309,7 +309,7 @@ VisitorManager::scheduleVisitor(
if (_enforceQueueUse || totCount >= maximumConcurrent(*cmd)) {
api::CreateVisitorCommand::SP failCommand;
- if (cmd->getQueueTimeout() != 0 && _maxVisitorQueueSize > 0) {
+ if (cmd->getQueueTimeout() != vespalib::duration::zero() && _maxVisitorQueueSize > 0) {
if (_visitorQueue.size() < _maxVisitorQueueSize) {
// Still room in the queue
_visitorQueue.add(cmd);
@@ -348,7 +348,7 @@ VisitorManager::scheduleVisitor(
std::shared_ptr<api::CreateVisitorReply> reply(
new api::CreateVisitorReply(*failCommand));
std::ostringstream ost;
- if (cmd->getQueueTimeout() == 0) {
+ if (cmd->getQueueTimeout() == vespalib::duration::zero()) {
ost << "Already running the maximum amount ("
<< maximumConcurrent(*failCommand)
<< ") of visitors for this priority ("
@@ -632,7 +632,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
it->_command);
assert(cmd.get());
out << "<li>" << cmd->getInstanceId() << " - "
- << cmd->getQueueTimeout() << ", remaining timeout "
+ << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout "
<< (it->_time - time.getTime()) / 1000000 << " ms\n";
}
if (_visitorQueue.empty()) {
@@ -657,7 +657,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
<< "<td>" << it->first << "</td>"
<< "<td>" << it->second.id << "</td>"
<< "<td>" << it->second.timestamp << "</td>"
- << "<td>" << it->second.timeout << "</td>"
+ << "<td>" << vespalib::count_ms(it->second.timeout) << "</td>"
<< "<td>" << it->second.destination << "</td>"
<< "</tr>\n";
}
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 3a3e743eaf2..3675a824e1d 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -55,7 +55,7 @@ private:
struct MessageInfo {
api::VisitorId id;
time_t timestamp;
- uint64_t timeout;
+ vespalib::duration timeout;
std::string destination;
};
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 142e7a89144..006af5edf7d 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -546,7 +546,7 @@ VisitorThread::onCreateVisitor(
std::move(messageSession),
documentPriority);
visitor->attach(cmd, *controlAddress, *dataAddress,
- framework::MilliSecTime(cmd->getTimeout()));
+ framework::MilliSecTime(vespalib::count_ms(cmd->getTimeout())));
} catch (std::exception& e) {
// We don't handle exceptions from this code, as we've
// added visitor to internal structs we'll end up calling
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index b1a754bbbab..dbd79e4fcca 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -522,7 +522,7 @@ TEST_P(StorageProtocolTest, create_visitor) {
cmd->getBuckets() = buckets;
cmd->setFieldSet("foo,bar,vekterli");
cmd->setVisitInconsistentBuckets();
- cmd->setQueueTimeout(100);
+ cmd->setQueueTimeout(100ms);
cmd->setPriority(149);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ("library", cmd2->getLibraryName());
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
index 466ff85f398..b90153c9517 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
@@ -44,7 +44,7 @@ ProtocolSerialization4_2::onDecodeGetCommand(BBuf& buf) const
api::GetCommand::UP msg(
new api::GetCommand(bucket, did, headerOnly ? "[header]" : "[all]", beforeTimestamp));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -64,7 +64,7 @@ ProtocolSerialization4_2::onDecodeRemoveCommand(BBuf& buf) const
api::Timestamp timestamp(SH::getLong(buf));
api::RemoveCommand::UP msg(new api::RemoveCommand(bucket, did, timestamp));
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -88,7 +88,7 @@ ProtocolSerialization4_2::onDecodeRevertCommand(BBuf& buf) const
}
api::RevertCommand::UP msg(new api::RevertCommand(bucket, tokens));
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -104,7 +104,7 @@ ProtocolSerialization4_2::onDecodeCreateBucketCommand(BBuf& buf) const
document::Bucket bucket = getBucket(buf);
api::CreateBucketCommand::UP msg(new api::CreateBucketCommand(bucket));
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -138,7 +138,7 @@ ProtocolSerialization4_2::onDecodeMergeBucketCommand(BBuf& buf) const
api::MergeBucketCommand::UP msg(
new api::MergeBucketCommand(bucket, nodes, timestamp));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -187,7 +187,7 @@ ProtocolSerialization4_2::onDecodeGetBucketDiffCommand(BBuf& buf) const
onDecodeDiffEntry(buf, entries[i]);
}
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -259,7 +259,7 @@ ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const
entries[i]._bodyBlob.size());
}
onDecodeBucketInfoCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -291,7 +291,7 @@ ProtocolSerialization4_2::onDecodeRequestBucketInfoReply(const SCmd& cmd,
entry._info = getBucketInfo(buf);
}
onDecodeReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -326,7 +326,7 @@ ProtocolSerialization4_2::onDecodeNotifyBucketChangeReply(const SCmd& cmd,
api::NotifyBucketChangeReply::UP msg(new api::NotifyBucketChangeReply(
static_cast<const api::NotifyBucketChangeCommand&>(cmd)));
onDecodeReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -350,7 +350,7 @@ ProtocolSerialization4_2::onDecodeSplitBucketCommand(BBuf& buf) const
msg->setMinByteSize(SH::getInt(buf));
msg->setMinDocCount(SH::getInt(buf));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void ProtocolSerialization4_2::onEncode(
@@ -403,7 +403,7 @@ ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateVisitorCommand&
buf.putBoolean(msg.visitRemoves());
buf.putBoolean(msg.getFieldSet() == "[header]");
buf.putBoolean(msg.visitInconsistentBuckets());
- buf.putInt(msg.getQueueTimeout());
+ buf.putInt(vespalib::count_ms(msg.getQueueTimeout()));
uint32_t size = msg.getParameters().getSerializedSize();
char* docBuffer = buf.allocate(size);
@@ -449,12 +449,12 @@ ProtocolSerialization4_2::onDecodeCreateVisitorCommand(BBuf& buf) const
if (SH::getBoolean(buf)) {
msg->setVisitInconsistentBuckets();
}
- msg->setQueueTimeout(SH::getInt(buf));
+ msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf)));
msg->getParameters().deserialize(getTypeRepo(), buf);
onDecodeCommand(buf, *msg);
msg->setVisitorDispatcherVersion(42);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -471,7 +471,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorCommand(BBuf& buf) const
vespalib::stringref instanceId = SH::getString(buf);
api::DestroyVisitorCommand::UP msg(new api::DestroyVisitorCommand(instanceId));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -485,7 +485,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf
{
api::DestroyVisitorReply::UP msg(new api::DestroyVisitorReply(static_cast<const api::DestroyVisitorCommand&>(cmd)));
onDecodeReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
void
@@ -505,7 +505,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationCommand(BBuf& buf) const
api::RemoveLocationCommand::UP msg;
msg.reset(new api::RemoveLocationCommand(documentSelection, bucket));
onDecodeCommand(buf, *msg);
- return api::StorageCommand::UP(msg.release());
+ return msg;
}
void
@@ -519,7 +519,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf
{
api::RemoveLocationReply::UP msg(new api::RemoveLocationReply(static_cast<const api::RemoveLocationCommand&>(cmd)));
onDecodeBucketInfoReply(buf, *msg);
- return api::StorageReply::UP(msg.release());
+ return msg;
}
// Utility functions for serialization
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
index 32680b24683..b0a1685ed8c 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
@@ -132,7 +132,7 @@ ProtocolSerialization5_1::onEncode(
buf.putBoolean(msg.visitRemoves());
buf.putString(msg.getFieldSet());
buf.putBoolean(msg.visitInconsistentBuckets());
- buf.putInt(msg.getQueueTimeout());
+ buf.putInt(vespalib::count_ms(msg.getQueueTimeout()));
uint32_t size = msg.getParameters().getSerializedSize();
char* docBuffer = buf.allocate(size);
@@ -181,7 +181,7 @@ ProtocolSerialization5_1::onDecodeCreateVisitorCommand(BBuf& buf) const
if (SH::getBoolean(buf)) {
msg->setVisitInconsistentBuckets();
}
- msg->setQueueTimeout(SH::getInt(buf));
+ msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf)));
msg->getParameters().deserialize(getTypeRepo(), buf);
onDecodeCommand(buf, *msg);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index d0446f52893..bf56dd56db6 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -1162,7 +1162,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorComman
ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId());
ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size());
ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size());
- ctrl_meta->set_queue_timeout(msg.getQueueTimeout());
+ ctrl_meta->set_queue_timeout(vespalib::count_ms(msg.getQueueTimeout()));
ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount());
ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor());
@@ -1211,7 +1211,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBu
cmd->setControlDestination(ctrl_meta.control_destination());
cmd->setDataDestination(ctrl_meta.data_destination());
cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count());
- cmd->setQueueTimeout(ctrl_meta.queue_timeout());
+ cmd->setQueueTimeout(std::chrono::milliseconds(ctrl_meta.queue_timeout()));
cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor());
cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl
diff --git a/storageapi/src/vespa/storageapi/message/visitor.cpp b/storageapi/src/vespa/storageapi/message/visitor.cpp
index dbda2d0d0c2..aeb58f30fb4 100644
--- a/storageapi/src/vespa/storageapi/message/visitor.cpp
+++ b/storageapi/src/vespa/storageapi/message/visitor.cpp
@@ -34,7 +34,7 @@ CreateVisitorCommand::CreateVisitorCommand(document::BucketSpace bucketSpace,
_visitRemoves(false),
_fieldSet("[all]"),
_visitInconsistentBuckets(false),
- _queueTimeout(2000),
+ _queueTimeout(2000ms),
_maxPendingReplyCount(2),
_version(50),
_maxBucketsPerVisitor(1)
@@ -82,15 +82,12 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose,
out << ") {";
out << "\n" << indent << " Library name: '" << _libName << "'";
out << "\n" << indent << " Instance Id: '" << _instanceId << "'";
- out << "\n" << indent << " Control Destination: '"
- << _controlDestination << "'";
- out << "\n" << indent << " Data Destination: '"
- << _dataDestination << "'";
+ out << "\n" << indent << " Control Destination: '" << _controlDestination << "'";
+ out << "\n" << indent << " Data Destination: '" << _dataDestination << "'";
out << "\n" << indent << " Doc Selection: '" << _docSelection << "'";
- out << "\n" << indent << " Max pending: '"
- << _maxPendingReplyCount << "'";
- out << "\n" << indent << " Timeout: " << getTimeout();
- out << "\n" << indent << " Queue timeout: " << _queueTimeout << " ms";
+ out << "\n" << indent << " Max pending: '" << _maxPendingReplyCount << "'";
+ out << "\n" << indent << " Timeout: " << vespalib::count_ms(getTimeout()) << " ms";
+ out << "\n" << indent << " Queue timeout: " << vespalib::count_ms(_queueTimeout) << " ms";
out << "\n" << indent << " VisitorDispatcher version: '" << _version << "'";
if (visitRemoves()) {
out << "\n" << indent << " Visiting remove entries too";
@@ -109,8 +106,7 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose,
}
out << "\n" << indent << " ";
_params.print(out, verbose, indent + " ");
- out << "\n" << indent << " Max buckets: '"
- << _maxBucketsPerVisitor << "'";
+ out << "\n" << indent << " Max buckets: '" << _maxBucketsPerVisitor << "'";
out << "\n" << indent << "} : ";
StorageCommand::print(out, verbose, indent);
} else if (_buckets.size() == 2) {
diff --git a/storageapi/src/vespa/storageapi/message/visitor.h b/storageapi/src/vespa/storageapi/message/visitor.h
index f7dcaa63b20..7189cc67195 100644
--- a/storageapi/src/vespa/storageapi/message/visitor.h
+++ b/storageapi/src/vespa/storageapi/message/visitor.h
@@ -44,7 +44,7 @@ private:
vespalib::string _fieldSet;
bool _visitInconsistentBuckets;
- uint32_t _queueTimeout;
+ duration _queueTimeout;
uint32_t _maxPendingReplyCount;
uint32_t _version;
@@ -61,22 +61,17 @@ public:
~CreateVisitorCommand();
void setVisitorCmdId(uint32_t id) { _visitorCmdId = id; }
- void setControlDestination(vespalib::stringref d)
- { _controlDestination = d; }
+ void setControlDestination(vespalib::stringref d) { _controlDestination = d; }
void setDataDestination(vespalib::stringref d) { _dataDestination = d; }
void setParameters(const vdslib::Parameters& params) { _params = params; }
- void setMaximumPendingReplyCount(uint32_t count)
- { _maxPendingReplyCount = count; }
- void setFieldSet(vespalib::stringref fieldSet)
- { _fieldSet = fieldSet; }
+ void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; }
+ void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; }
void setVisitRemoves(bool value = true) { _visitRemoves = value; }
- void setVisitInconsistentBuckets(bool visitInconsistent = true)
- { _visitInconsistentBuckets = visitInconsistent; }
- void addBucketToBeVisited(const document::BucketId& id)
- { _buckets.push_back(id); }
+ void setVisitInconsistentBuckets(bool visitInconsistent = true) { _visitInconsistentBuckets = visitInconsistent; }
+ void addBucketToBeVisited(const document::BucketId& id) { _buckets.push_back(id); }
void setVisitorId(const VisitorId id) { _visitorId = id; }
void setInstanceId(vespalib::stringref id) { _instanceId = id; }
- void setQueueTimeout(uint32_t milliSecs) { _queueTimeout = milliSecs; }
+ void setQueueTimeout(duration milliSecs) { _queueTimeout = milliSecs; }
void setFromTime(Timestamp ts) { _fromTime = ts; }
void setToTime(Timestamp ts) { _toTime = ts; }
@@ -86,24 +81,20 @@ public:
document::Bucket getBucket() const override;
const vespalib::string & getLibraryName() const { return _libName; }
const vespalib::string & getInstanceId() const { return _instanceId; }
- const vespalib::string & getControlDestination() const
- { return _controlDestination; }
+ const vespalib::string & getControlDestination() const { return _controlDestination; }
const vespalib::string & getDataDestination() const { return _dataDestination; }
const vespalib::string & getDocumentSelection() const { return _docSelection; }
const vdslib::Parameters& getParameters() const { return _params; }
vdslib::Parameters& getParameters() { return _params; }
- uint32_t getMaximumPendingReplyCount() const
- { return _maxPendingReplyCount; }
- const std::vector<document::BucketId>& getBuckets() const
- { return _buckets; }
+ uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; }
+ const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
Timestamp getFromTime() const { return _fromTime; }
Timestamp getToTime() const { return _toTime; }
std::vector<document::BucketId>& getBuckets() { return _buckets; }
bool visitRemoves() const { return _visitRemoves; }
const vespalib::string& getFieldSet() const { return _fieldSet; }
bool visitInconsistentBuckets() const { return _visitInconsistentBuckets; }
- // In millisec
- uint32_t getQueueTimeout() const { return _queueTimeout; }
+ duration getQueueTimeout() const { return _queueTimeout; }
void setVisitorDispatcherVersion(uint32_t version) { _version = version; }
uint32_t getVisitorDispatcherVersion() const { return _version; }
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp
index fe33066872a..d9bbf34141a 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storagecommand.h"
-#include <limits>
#include <vespa/vespalib/util/exceptions.h>
#include <ostream>
@@ -20,13 +19,13 @@ StorageCommand::StorageCommand(const MessageType& type, Priority p)
// Default timeout is unlimited. Set from mbus message. Some internal
// use want unlimited timeout, (such as readbucketinfo, repair bucket
// etc)
- _timeout(std::numeric_limits<uint32_t>().max()),
+ _timeout(duration::max()),
_sourceIndex(0xFFFF)
{
setPriority(p);
}
-StorageCommand::~StorageCommand() { }
+StorageCommand::~StorageCommand() = default;
void
StorageCommand::print(std::ostream& out, bool verbose,
@@ -36,7 +35,7 @@ StorageCommand::print(std::ostream& out, bool verbose,
out << "StorageCommand(" << _type.getName();
if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority);
if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex;
- out << ", timeout = " << _timeout << " ms";
+ out << ", timeout = " << vespalib::count_ms(_timeout) << " ms";
out << ")";
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
index 2885dac3b91..c835168c5b7 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
@@ -19,7 +19,7 @@ namespace storage::api {
class StorageReply;
class StorageCommand : public StorageMessage {
- uint32_t _timeout; /** Timeout of command in milliseconds */
+ duration _timeout; /** Timeout of command in milliseconds */
/** Sets what node this message origins from. 0xFFFF is unset. */
uint16_t _sourceIndex;
@@ -37,9 +37,9 @@ public:
uint16_t getSourceIndex() const { return _sourceIndex; }
/** Set timeout in milliseconds. */
- void setTimeout(uint32_t milliseconds) { _timeout = milliseconds; }
+ void setTimeout(duration milliseconds) { _timeout = milliseconds; }
/** Get timeout in milliseconds. */
- uint32_t getTimeout() const { return _timeout; }
+ duration getTimeout() const { return _timeout; }
/** Used to set a new id so the message can be resent. */
void setNewId() { StorageMessage::setNewMsgId(); }
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index 8c2338a020c..e119884bd1f 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -70,6 +70,8 @@ public: \
namespace storage::api {
+using duration = vespalib::duration;
+
/**
* @class MessageType
* @ingroup messageapi
diff --git a/vespalib/src/vespa/vespalib/util/sync.h b/vespalib/src/vespa/vespalib/util/sync.h
index 12961dffef7..8458bc19629 100644
--- a/vespalib/src/vespa/vespalib/util/sync.h
+++ b/vespalib/src/vespa/vespalib/util/sync.h
@@ -378,7 +378,7 @@ public:
bool wait(int msTimeout) {
return wait(std::chrono::milliseconds(msTimeout));
}
- bool wait(std::chrono::milliseconds timeout) {
+ bool wait(std::chrono::nanoseconds timeout) {
return _cond->wait_for(_guard, timeout) == std::cv_status::no_timeout;
}
/**