diff options
Diffstat (limited to 'messagebus')
41 files changed, 299 insertions, 321 deletions
diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp index b18653d272d..f1557b8a305 100644 --- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp +++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp @@ -115,7 +115,7 @@ Test::Main() void Test::testAdvanced(TestData &data) { - const double TIMEOUT = 60; + const duration TIMEOUT = 60s; IProtocol::SP protocol(new SimpleProtocol()); SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol); simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE))); @@ -130,41 +130,41 @@ Test::testAdvanced(TestData &data) // Initial send. Message::UP msg = data._fooHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._fooSession->acknowledge(std::move(msg)); msg = data._barHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "bar")); data._barSession->reply(std::move(reply)); msg = data._bazHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "baz1")); data._bazSession->reply(std::move(reply)); // First retry. - msg = data._fooHandler.getMessage(0); + msg = data._fooHandler.getMessage(duration::zero()); ASSERT_TRUE(msg.get() == NULL); msg = data._barHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._barSession->acknowledge(std::move(msg)); msg = data._bazHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "baz2")); data._bazSession->reply(std::move(reply)); // Second retry. - msg = data._fooHandler.getMessage(0); + msg = data._fooHandler.getMessage(duration::zero()); ASSERT_TRUE(msg.get() == NULL); - msg = data._barHandler.getMessage(0); + msg = data._barHandler.getMessage(duration::zero()); ASSERT_TRUE(msg.get() == NULL); msg = data._bazHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::FATAL_ERROR, "baz3")); @@ -172,7 +172,7 @@ Test::testAdvanced(TestData &data) // Done. reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::FATAL_ERROR, reply->getError(0).getCode()); diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp index 105da2b3bd1..e10d87ed2ba 100644 --- a/messagebus/src/tests/choke/choke.cpp +++ b/messagebus/src/tests/choke/choke.cpp @@ -71,13 +71,13 @@ TestData::start() _srcSession = _srcServer.mb.createSourceSession(SourceSessionParams() .setThrottlePolicy(IThrottlePolicy::SP()) .setReplyHandler(_srcHandler)); - if (_srcSession.get() == NULL) { + if ( ! _srcSession) { return false; } _dstSession = _dstServer.mb.createDestinationSession(DestinationSessionParams() .setName("session") .setMessageHandler(_dstHandler)); - if (_dstSession.get() == NULL) { + if ( ! _dstSession) { return false; } if (!_srcServer.waitSlobrok("dst/session", 1u)) { @@ -108,7 +108,7 @@ Test::Main() TEST_DONE(); } -static const double TIMEOUT = 120; +static const duration TIMEOUT = 120s; //////////////////////////////////////////////////////////////////////////////// // @@ -131,11 +131,11 @@ Test::testMaxCount(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); if (i < max) { Message::UP msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } else { Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode()); } @@ -146,14 +146,14 @@ Test::testMaxCount(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); msg = reply->getMessage(); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } while (!lst.empty()) { @@ -163,7 +163,7 @@ Test::testMaxCount(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } EXPECT_EQUAL(0u, data._dstServer.mb.getPendingCount()); @@ -185,11 +185,11 @@ Test::testMaxSize(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); if (i < max) { Message::UP msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } else { Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode()); } @@ -200,14 +200,14 @@ Test::testMaxSize(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); msg = reply->getMessage(); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } while (!lst.empty()) { @@ -217,7 +217,7 @@ Test::testMaxSize(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } EXPECT_EQUAL(0u, data._dstServer.mb.getPendingSize()); diff --git a/messagebus/src/tests/context/context.cpp b/messagebus/src/tests/context/context.cpp index de9dd1b83a6..39a7ca7b467 100644 --- a/messagebus/src/tests/context/context.cpp +++ b/messagebus/src/tests/context/context.cpp @@ -1,17 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/messagebus/destinationsession.h> -#include <vespa/messagebus/intermediatesession.h> #include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/routablequeue.h> #include <vespa/messagebus/routing/routingspec.h> #include <vespa/messagebus/sourcesession.h> #include <vespa/messagebus/sourcesessionparams.h> -#include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simplereply.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> #include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/testkit/testapp.h> @@ -24,7 +20,7 @@ struct Handler : public IMessageHandler Handler(MessageBus &mb) : session() { session = mb.createDestinationSession("session", true, *this); } - ~Handler() { + ~Handler() override { session.reset(); } void handleMessage(Message::UP msg) override { @@ -81,18 +77,18 @@ Test::Main() } EXPECT_EQUAL(queue.size(), 3u); { - Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); - ASSERT_TRUE(reply.get() != 0); + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release()); + ASSERT_TRUE(reply); EXPECT_EQUAL(reply->getContext().value.UINT64, 10u); } { - Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); - ASSERT_TRUE(reply.get() != 0); + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release()); + ASSERT_TRUE(reply); EXPECT_EQUAL(reply->getContext().value.UINT64, 20u); } { - Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); - ASSERT_TRUE(reply.get() != 0); + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release()); + ASSERT_TRUE(reply); EXPECT_EQUAL(reply->getContext().value.UINT64, 30u); } TEST_DONE(); diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp index 7434941a900..cf249e6eaec 100644 --- a/messagebus/src/tests/messagebus/messagebus.cpp +++ b/messagebus/src/tests/messagebus/messagebus.cpp @@ -21,7 +21,7 @@ struct Base { Base() : queue() {} virtual ~Base() { while (queue.size() > 0) { - Routable::UP r = queue.dequeue(0); + Routable::UP r = queue.dequeue(); r->getCallStack().discard(); } } @@ -219,8 +219,8 @@ Test::testSendToAny() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP msg = p->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = p->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); reply->addError(Error(ErrorCode::FATAL_ERROR, "")); @@ -229,8 +229,8 @@ Test::testSendToAny() } EXPECT_TRUE(client->waitQueueSize(300)); while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(0); - ASSERT_TRUE(reply.get() != 0); + Routable::UP reply = client->queue.dequeue(); + ASSERT_TRUE(reply); ASSERT_TRUE(reply->isReply()); EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1); } @@ -262,8 +262,8 @@ Test::testSendToCol() for (uint32_t i = 0; i < searchVec.size(); ++i) { Search *s = searchVec[i]; while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = s->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); s->session->reply(std::move(reply)); @@ -273,8 +273,8 @@ Test::testSendToCol() FastOS_Thread::Sleep(100); client->waitQueueSize(300); while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(0); - ASSERT_TRUE(reply.get() != 0); + Routable::UP reply = client->queue.dequeue(); + ASSERT_TRUE(reply); ASSERT_TRUE(reply->isReply()); EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0); } @@ -296,8 +296,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -316,8 +316,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -328,8 +328,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < searchVec.size(); ++i) { Search *s = searchVec[i]; while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = s->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); s->session->reply(std::move(reply)); @@ -341,8 +341,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -350,8 +350,8 @@ Test::testSendToAnyThenCol() FastOS_Thread::Sleep(100); client->waitQueueSize(300); while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(0); - ASSERT_TRUE(reply.get() != 0); + Routable::UP reply = client->queue.dequeue(); + ASSERT_TRUE(reply); ASSERT_TRUE(reply->isReply()); EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0); } @@ -423,8 +423,8 @@ void Test::assertDst(Search& dst) { ASSERT_TRUE(dst.waitQueueSize(1)); - Routable::UP msg = dst.queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = dst.queue.dequeue(); + ASSERT_TRUE(msg); dst.session->acknowledge(Message::UP(static_cast<Message*>(msg.release()))); } @@ -432,8 +432,8 @@ void Test::assertItr(DocProc& itr) { ASSERT_TRUE(itr.waitQueueSize(1)); - Routable::UP msg = itr.queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = itr.queue.dequeue(); + ASSERT_TRUE(msg); itr.session->forward(std::move(msg)); } @@ -441,8 +441,8 @@ void Test::assertSrc(Client& src) { ASSERT_TRUE(src.waitQueueSize(1)); - Routable::UP msg = src.queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = src.queue.dequeue(); + ASSERT_TRUE(msg); } void @@ -485,8 +485,8 @@ Test::debugTrace() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -497,8 +497,8 @@ Test::debugTrace() for (uint32_t i = 0; i < searchVec.size(); ++i) { Search *s = searchVec[i]; while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = s->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); s->session->reply(std::move(reply)); @@ -510,21 +510,21 @@ Test::debugTrace() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } client->waitQueueSize(3); - Routable::UP reply = client->queue.dequeue(0); + Routable::UP reply = client->queue.dequeue(); fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", reply->getTrace().getLevel(), reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(0); + reply = client->queue.dequeue(); fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", reply->getTrace().getLevel(), reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(0); + reply = client->queue.dequeue(); fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", reply->getTrace().getLevel(), reply->getTrace().toString().c_str()); diff --git a/messagebus/src/tests/resender/resender.cpp b/messagebus/src/tests/resender/resender.cpp index cd7fdbeb6cc..f40dbdc5e4a 100644 --- a/messagebus/src/tests/resender/resender.cpp +++ b/messagebus/src/tests/resender/resender.cpp @@ -3,9 +3,7 @@ #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/messagebus.h> -#include <vespa/messagebus/routing/errordirective.h> #include <vespa/messagebus/routing/retrytransienterrorspolicy.h> -#include <vespa/messagebus/testlib/custompolicy.h> #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> @@ -32,7 +30,7 @@ StringList::add(const string &str) std::vector<string>::push_back(str); return *this; } -static const double GET_MESSAGE_TIMEOUT = 60.0; +static const duration GET_MESSAGE_TIMEOUT = 60s; //////////////////////////////////////////////////////////////////////////////// // @@ -158,20 +156,20 @@ Test::testRetryTag(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); for (uint32_t i = 0; i < 5; ++i) { EXPECT_EQUAL(i, msg->getRetry()); EXPECT_EQUAL(true, msg->getRetryEnabled()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -183,14 +181,14 @@ Test::testRetryEnabledTag(TestData &data) msg->setRetryEnabled(false); EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_EQUAL(false, msg->getRetryEnabled()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -200,16 +198,16 @@ Test::testTransientError(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -219,13 +217,13 @@ Test::testFatalError(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -235,14 +233,14 @@ Test::testDisableRetry(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasErrors()); EXPECT_TRUE(!reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -253,19 +251,19 @@ Test::testRetryDelay(TestData &data) data._retryPolicy->setBaseDelay(0.01); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); for (uint32_t i = 0; i < 5; ++i) { EXPECT_EQUAL(i, msg->getRetry()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, -1); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); string trace = reply->getTrace().toString(); EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos); @@ -282,19 +280,19 @@ Test::testRequestRetryDelay(TestData &data) data._retryPolicy->setBaseDelay(1); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); for (uint32_t i = 0; i < 5; ++i) { EXPECT_EQUAL(i, msg->getRetry()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, i / 50.0); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); string trace = reply->getTrace().toString(); EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos); diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp index a7a35508656..88db25fa9c7 100644 --- a/messagebus/src/tests/routable/routable.cpp +++ b/messagebus/src/tests/routable/routable.cpp @@ -73,7 +73,7 @@ Test::Main() msg.pushHandler(handler); msg.discard(); - Reply::UP reply = handler.getReply(0); + Reply::UP reply = handler.getReply(duration::zero()); ASSERT_FALSE(reply); } { @@ -86,7 +86,7 @@ Test::Main() reply.swapState(msg); reply.discard(); - Reply::UP ap = handler.getReply(0); + Reply::UP ap = handler.getReply(duration::zero()); ASSERT_FALSE(ap); } diff --git a/messagebus/src/tests/routablequeue/routablequeue.cpp b/messagebus/src/tests/routablequeue/routablequeue.cpp index 4e7c918134f..04e90984f88 100644 --- a/messagebus/src/tests/routablequeue/routablequeue.cpp +++ b/messagebus/src/tests/routablequeue/routablequeue.cpp @@ -40,8 +40,8 @@ Test::Main() { RoutableQueue rq; EXPECT_TRUE(rq.size() == 0); - EXPECT_TRUE(rq.dequeue(0).get() == 0); - EXPECT_TRUE(rq.dequeue(100).get() == 0); + EXPECT_TRUE(rq.dequeue().get() == 0); + EXPECT_TRUE(rq.dequeue(100ms).get() == 0); EXPECT_TRUE(TestMessage::getCnt() == 0); EXPECT_TRUE(TestReply::getCnt() == 0); rq.enqueue(Routable::UP(new TestMessage(101))); @@ -61,16 +61,16 @@ Test::Main() EXPECT_TRUE(TestMessage::getCnt() == 2); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 3); EXPECT_TRUE(r->getType() == 101); } EXPECT_TRUE(TestMessage::getCnt() == 1); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 2); EXPECT_TRUE(r->getType() == 201); } @@ -85,16 +85,16 @@ Test::Main() EXPECT_TRUE(TestMessage::getCnt() == 2); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 3); EXPECT_TRUE(r->getType() == 102); } EXPECT_TRUE(TestMessage::getCnt() == 1); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 2); EXPECT_TRUE(r->getType() == 202); } diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp index 42c5938fe92..48c86a53160 100644 --- a/messagebus/src/tests/routing/routing.cpp +++ b/messagebus/src/tests/routing/routing.cpp @@ -8,7 +8,6 @@ #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> -#include <vespa/messagebus/testlib/simplereply.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/component/vtag.h> @@ -485,7 +484,7 @@ private: bool testTrace(TestData &data, const std::vector<string> &expected); bool testTrace(const std::vector<string> &expected, const Trace &trace); - static const double RECEPTOR_TIMEOUT; + static const duration RECEPTOR_TIMEOUT; public: int Main() override; @@ -540,7 +539,7 @@ public: void requireThatDepthLimitCanBeIgnored(TestData &data); }; -const double Test::RECEPTOR_TIMEOUT = 120.0; +const duration Test::RECEPTOR_TIMEOUT = 120s; TEST_APPHOOK(Test); @@ -613,7 +612,7 @@ bool Test::testAcknowledge(TestData &data) { Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - if (!EXPECT_TRUE(msg.get() != NULL)) { + if (!EXPECT_TRUE(msg)) { return false; } data._dstSession->acknowledge(std::move(msg)); @@ -632,7 +631,7 @@ bool Test::testTrace(TestData &data, const std::vector<string> &expected) { Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - if (!EXPECT_TRUE(reply.get() != NULL)) { + if (!EXPECT_TRUE(reply)) { return false; } if (!EXPECT_TRUE(!reply->hasErrors())) { @@ -747,7 +746,7 @@ Test::testNoRoutingTable(TestData &data) EXPECT_TRUE(!res.isAccepted()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode()); Message::UP msg = res.getMessage(); - EXPECT_TRUE(msg.get() != NULL); + EXPECT_TRUE(msg); } void @@ -759,7 +758,7 @@ Test::testUnknownRoute(TestData &data) EXPECT_TRUE(!res.isAccepted()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode()); Message::UP msg = res.getMessage(); - EXPECT_TRUE(msg.get() != NULL); + EXPECT_TRUE(msg); } void @@ -767,7 +766,7 @@ Test::testNoRoute(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route()).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -779,10 +778,10 @@ Test::testRecognizeHopName(TestData &data) .addHop(HopSpec("dst", "dst/session")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -794,10 +793,10 @@ Test::testRecognizeRouteDirective(TestData &data) .addHop(HopSpec("dir", "route:dst")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dir")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -808,10 +807,10 @@ Test::testRecognizeRouteName(TestData &data) .addRoute(RouteSpec("dst").addHop("dst/session")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -823,7 +822,7 @@ Test::testHopResolutionOverflow(TestData &data) .addHop(HopSpec("bar", "foo")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -835,7 +834,7 @@ Test::testRouteResolutionOverflow(TestData &data) .addRoute(RouteSpec("foo").addHop("route:foo")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), "foo").isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -847,13 +846,13 @@ Test::testInsertRoute(TestData &data) .addRoute(RouteSpec("foo").addHop("dst/session").addHop("bar")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("route:foo baz")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_EQUAL(2u, msg->getRoute().getNumHops()); EXPECT_EQUAL("bar", msg->getRoute().getHop(0).toString()); EXPECT_EQUAL("baz", msg->getRoute().getHop(1).toString()); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -864,7 +863,7 @@ Test::testErrorDirective(TestData &data) route.getHop(0).setDirective(1, IHopDirective::SP(new ErrorDirective("err"))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); EXPECT_EQUAL("err", reply->getError(0).getMessage()); @@ -879,7 +878,7 @@ Test::testSelectError(TestData &data) data._srcServer.mb.putProtocol(protocol); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom: ]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); LOG(info, "testSelectError trace=%s", reply->getTrace().toString().c_str()); LOG(info, "testSelectError error=%s", reply->getError(0).toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); @@ -895,7 +894,7 @@ Test::testSelectNone(TestData &data) data._srcServer.mb.putProtocol(protocol); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_SERVICES_FOR_ROUTE, reply->getError(0).getCode()); } @@ -909,10 +908,10 @@ Test::testSelectOne(TestData &data) data._srcServer.mb.putProtocol(protocol); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -922,22 +921,22 @@ Test::testResend1(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err2")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("[APP_TRANSIENT_ERROR @ localhost]: err1") @@ -957,22 +956,22 @@ Test::testResend2(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err2")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Source session accepted a 3 byte message. 1 message(s) now pending.") @@ -1022,13 +1021,13 @@ Test::testNoResend(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1")); data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_TRANSIENT_ERROR, reply->getError(0).getCode()); } @@ -1043,16 +1042,16 @@ Test::testSelectOnResend(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Selecting { 'dst/session' }.") @@ -1075,16 +1074,16 @@ Test::testNoSelectOnResend(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Selecting { 'dst/session' }.") @@ -1107,10 +1106,10 @@ Test::testCanConsumeError(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session,dst/unknown]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); EXPECT_TRUE(testTrace(StringList() @@ -1131,7 +1130,7 @@ Test::testCantConsumeError(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/unknown]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); @@ -1152,10 +1151,10 @@ Test::testNestedPolicies(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); } @@ -1173,10 +1172,10 @@ Test::testRemoveReply(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("[NO_ADDRESS_FOR_SERVICE @ localhost]") @@ -1197,10 +1196,10 @@ Test::testSetReply(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Select:[SetReply:foo],dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL("foo", reply->getError(0).getMessage()); @@ -1221,16 +1220,16 @@ Test::testResendSetAndReuseReply(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[ReuseReply:[SetReply:foo],dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "dst")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -1250,10 +1249,10 @@ Test::testResendSetAndRemoveReply(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[RemoveReply:[SetReply:foo],dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL("foo", reply->getError(0).getMessage()); @@ -1271,13 +1270,13 @@ Test::testHopIgnoresReply(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("?dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "dst")); data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Not waiting for a reply from 'dst/session'."), @@ -1291,13 +1290,13 @@ Test::testHopBlueprintIgnoresReply(TestData &data) .addHop(HopSpec("foo", "dst/session").setIgnoreResult(true)))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "dst")); data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Not waiting for a reply from 'dst/session'."), @@ -1309,12 +1308,12 @@ Test::testAcceptEmptyRoute(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); const Route &route = msg->getRoute(); EXPECT_EQUAL(0u, route.getNumHops()); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); } void @@ -1333,7 +1332,7 @@ Test::testAbortOnlyActiveNodes(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL((uint32_t)ErrorCode::SEND_ABORTED, reply->getError(1).getCode()); @@ -1344,7 +1343,7 @@ Test::testUnknownPolicy(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Unknown]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::UNKNOWN_POLICY, reply->getError(0).getCode()); } @@ -1362,7 +1361,7 @@ Test::testSelectException(TestData &data) Route::parse("[SelectException]")) .isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR, reply->getError(0).getCode()); @@ -1383,10 +1382,10 @@ Test::testMergeException(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route) .isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR, reply->getError(0).getCode()); @@ -1423,7 +1422,7 @@ Test::requireThatIgnoreFlagIsSerializedWithMessage(TestData &data) { ASSERT_TRUE(testSend(data, "dst/session foo ?bar")); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Route route = msg->getRoute(); EXPECT_EQUAL(2u, route.getNumHops()); Hop hop = route.getHop(0); @@ -1533,10 +1532,10 @@ Test::testTimeout(TestData &data) { data._retryPolicy->setEnabled(true); data._retryPolicy->setBaseDelay(0.01); - data._srcSession->setTimeout(0.5); + data._srcSession->setTimeout(500ms); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/unknown")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(1).getCode()); diff --git a/messagebus/src/tests/routingcontext/routingcontext.cpp b/messagebus/src/tests/routingcontext/routingcontext.cpp index fa0d8ed6536..1c971b29ee3 100644 --- a/messagebus/src/tests/routingcontext/routingcontext.cpp +++ b/messagebus/src/tests/routingcontext/routingcontext.cpp @@ -23,7 +23,7 @@ using namespace mbus; using vespalib::make_string; -static const double TIMEOUT = 120; +static const duration TIMEOUT = 120s; class StringList : public std::vector<string> { public: diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp index c4fc0f908e0..aed4de7b228 100644 --- a/messagebus/src/tests/sendadapter/sendadapter.cpp +++ b/messagebus/src/tests/sendadapter/sendadapter.cpp @@ -59,7 +59,7 @@ public: bool start(); }; -static const int TIMEOUT_SECS = 6; +static const duration TIMEOUT_SECS = 6s; TestData::TestData() : _slobrok(), @@ -121,7 +121,7 @@ testVersionedSend(TestData &data, return false; } msg = data._itrHandler.getMessage(TIMEOUT_SECS); - if (!EXPECT_TRUE(msg.get() != NULL)) { + if (!EXPECT_TRUE(msg)) { return false; } LOG(info, "Message version %s serialized at source.", @@ -138,7 +138,7 @@ testVersionedSend(TestData &data, } data._itrSession->forward(std::move(msg)); msg = data._dstHandler.getMessage(TIMEOUT_SECS); - if (!EXPECT_TRUE(msg.get() != NULL)) { + if (!EXPECT_TRUE(msg)) { return false; } LOG(info, "Message version %s serialized at intermediate.", @@ -157,7 +157,7 @@ testVersionedSend(TestData &data, reply->swapState(*msg); data._dstSession->reply(std::move(reply)); reply = data._itrHandler.getReply(); - if (!EXPECT_TRUE(reply.get() != NULL)) { + if (!EXPECT_TRUE(reply)) { return false; } LOG(info, "Reply version %s serialized at destination.", @@ -173,7 +173,7 @@ testVersionedSend(TestData &data, } data._itrSession->forward(std::move(reply)); reply = data._srcHandler.getReply(); - if (!EXPECT_TRUE(reply.get() != NULL)) { + if (!EXPECT_TRUE(reply)) { return false; } LOG(info, "Reply version %s serialized at intermediate.", diff --git a/messagebus/src/tests/sequencer/sequencer.cpp b/messagebus/src/tests/sequencer/sequencer.cpp index 218c1e43929..d347c3855cb 100644 --- a/messagebus/src/tests/sequencer/sequencer.cpp +++ b/messagebus/src/tests/sequencer/sequencer.cpp @@ -21,7 +21,7 @@ struct MyQueue : public RoutableQueue { virtual ~MyQueue() { while (size() > 0) { - Routable::UP obj = dequeue(0); + Routable::UP obj = dequeue(); obj->getCallStack().discard(); } } @@ -31,14 +31,14 @@ struct MyQueue : public RoutableQueue { LOG(error, "checkReply(): No reply in queue."); return false; } - Routable::UP obj = dequeue(0); + Routable::UP obj = dequeue(); if (!obj->isReply()) { LOG(error, "checkReply(): Got message when expecting reply."); return false; } Reply::UP reply(static_cast<Reply*>(obj.release())); Message::UP msg = reply->getMessage(); - if (msg.get() == NULL) { + if ( ! msg) { LOG(error, "checkReply(): Reply has no message attached."); return false; } @@ -64,7 +64,7 @@ struct MyQueue : public RoutableQueue { } void replyNext() { - Routable::UP obj = dequeue(0); + Routable::UP obj = dequeue(); Message::UP msg(static_cast<Message*>(obj.release())); Reply::UP reply(new EmptyReply()); diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp index 1706da3b55f..e415622707f 100644 --- a/messagebus/src/tests/shutdown/shutdown.cpp +++ b/messagebus/src/tests/shutdown/shutdown.cpp @@ -30,7 +30,7 @@ public: } }; -static const double TIMEOUT = 120; +static const duration TIMEOUT = 120s; TEST_APPHOOK(Test); @@ -77,11 +77,11 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe() SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams() .setThrottlePolicy(IThrottlePolicy::SP()) .setReplyHandler(srcHandler)); - ASSERT_TRUE(srcSession.get() != NULL); + ASSERT_TRUE(srcSession); ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1)); ASSERT_TRUE(srcSession->send(std::move(msg), "dst/session", true).isAccepted()); msg = dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } dstSession->acknowledge(std::move(msg)); } diff --git a/messagebus/src/tests/sourcesession/sourcesession.cpp b/messagebus/src/tests/sourcesession/sourcesession.cpp index 5177cf0e799..c793dd435c8 100644 --- a/messagebus/src/tests/sourcesession/sourcesession.cpp +++ b/messagebus/src/tests/sourcesession/sourcesession.cpp @@ -102,11 +102,11 @@ Test::testSequencing() FastOS_Thread::Sleep(250); EXPECT_TRUE(waitQueueSize(dstQ, 2)); EXPECT_TRUE(waitQueueSize(srcQ, 0)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); EXPECT_TRUE(waitQueueSize(srcQ, 2)); EXPECT_TRUE(waitQueueSize(dstQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 3)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); } @@ -137,7 +137,7 @@ Test::testResendError() } EXPECT_TRUE(waitQueueSize(dstQ, 1)); { - Routable::UP r = dstQ.dequeue(0); + Routable::UP r = dstQ.dequeue(); Reply::UP reply(new EmptyReply()); r->swapState(*reply); reply->addError(Error(ErrorCode::FATAL_ERROR, "error")); @@ -153,7 +153,7 @@ Test::testResendError() } EXPECT_TRUE(waitQueueSize(dstQ, 1)); { - Routable::UP r = dstQ.dequeue(0); + Routable::UP r = dstQ.dequeue(); Reply::UP reply(new EmptyReply()); r->swapState(*reply); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "error")); @@ -161,12 +161,12 @@ Test::testResendError() } EXPECT_TRUE(waitQueueSize(dstQ, 1)); EXPECT_TRUE(waitQueueSize(srcQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 2)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); { - string trace1 = srcQ.dequeue(0)->getTrace().toString(); - string trace2 = srcQ.dequeue(0)->getTrace().toString(); + string trace1 = srcQ.dequeue()->getTrace().toString(); + string trace2 = srcQ.dequeue()->getTrace().toString(); fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace1.c_str()); fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace2.c_str()); } @@ -202,7 +202,7 @@ Test::testResendConnDown() msg->getTrace().setLevel(9); EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dst2Q, 1)); - Routable::UP obj = dst2Q.dequeue(0); + Routable::UP obj = dst2Q.dequeue(); obj->discard(); src.mb.setupRouting(RoutingSpec().addTable(RoutingTableSpec(SimpleProtocol::NAME) .addHop(HopSpec("dst", "dst/session")))); @@ -210,11 +210,11 @@ Test::testResendConnDown() ASSERT_TRUE(waitQueueSize(dstQ, 1)); // fails ASSERT_TRUE(waitQueueSize(srcQ, 0)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); - string trace = srcQ.dequeue(0)->getTrace().toString(); + string trace = srcQ.dequeue()->getTrace().toString(); fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace.c_str()); } @@ -240,7 +240,7 @@ Test::testIllegalRoute() ASSERT_TRUE(waitQueueSize(srcQ, 1)); { while (srcQ.size() > 0) { - Routable::UP routable = srcQ.dequeue(0); + Routable::UP routable = srcQ.dequeue(); ASSERT_TRUE(routable->isReply()); Reply::UP r(static_cast<Reply*>(routable.release())); EXPECT_EQUAL(1u, r->getNumErrors()); @@ -272,7 +272,7 @@ Test::testNoServices() ASSERT_TRUE(waitQueueSize(srcQ, 1)); { while (srcQ.size() > 0) { - Routable::UP routable = srcQ.dequeue(0); + Routable::UP routable = srcQ.dequeue(); ASSERT_TRUE(routable->isReply()); Reply::UP r(static_cast<Reply*>(routable.release())); EXPECT_TRUE(r->getNumErrors() == 1); @@ -300,7 +300,7 @@ Test::testBlockingClose() EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo")), "dst").isAccepted()); ss->close(); srcQ.handleMessage(Message::UP(new SimpleMessage("bogus"))); - Routable::UP routable = srcQ.dequeue(0); + Routable::UP routable = srcQ.dequeue(); EXPECT_TRUE(routable->isReply()); } diff --git a/messagebus/src/tests/throttling/throttling.cpp b/messagebus/src/tests/throttling/throttling.cpp index 5d3525e8ba6..6599604bf9a 100644 --- a/messagebus/src/tests/throttling/throttling.cpp +++ b/messagebus/src/tests/throttling/throttling.cpp @@ -3,7 +3,6 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/messagebus/destinationsession.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/routablequeue.h> #include <vespa/messagebus/routing/retrytransienterrorspolicy.h> #include <vespa/messagebus/routing/routingspec.h> @@ -12,7 +11,6 @@ #include <vespa/messagebus/staticthrottlepolicy.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> #include <vespa/messagebus/testlib/simplereply.h> #include <vespa/messagebus/testlib/testserver.h> @@ -141,15 +139,15 @@ Test::testMaxPendingCount() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 5)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 5)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 3)); EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); @@ -158,11 +156,11 @@ Test::testMaxPendingCount() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 5)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 8)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); } @@ -202,17 +200,17 @@ Test::testMaxPendingSize() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 2)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted()); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 2)); EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("12")), "dst").isAccepted()); EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 3)); } @@ -244,7 +242,7 @@ Test::testMinOne() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); EXPECT_TRUE(waitQueueSize(dstQ, 0)); } diff --git a/messagebus/src/tests/timeout/timeout.cpp b/messagebus/src/tests/timeout/timeout.cpp index b2631e13d9c..980681a9d42 100644 --- a/messagebus/src/tests/timeout/timeout.cpp +++ b/messagebus/src/tests/timeout/timeout.cpp @@ -76,7 +76,7 @@ Test::testMessageExpires() EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode()); - Message::UP msg = dstHandler.getMessage(1); + Message::UP msg = dstHandler.getMessage(1s); if (msg) { msg->discard(); } diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h index df25bf17973..11594fcdc22 100644 --- a/messagebus/src/vespa/messagebus/common.h +++ b/messagebus/src/vespa/messagebus/common.h @@ -2,17 +2,14 @@ #pragma once #include <vespa/vespalib/stllike/string.h> -#include <chrono> +#include <vespa/vespalib/util/time.h> namespace mbus { // Decide the type of string used once using string = vespalib::string; - -using seconds = std::chrono::duration<double>; -using milliseconds = std::chrono::milliseconds; - - +using duration = vespalib::duration; +using time_point = vespalib::steady_clock::time_point; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp index c9fb03dd013..305ffb06aa0 100644 --- a/messagebus/src/vespa/messagebus/message.cpp +++ b/messagebus/src/vespa/messagebus/message.cpp @@ -10,7 +10,6 @@ #include <vespa/log/log.h> LOG_SETUP(".message"); -using namespace std::chrono; namespace mbus { Message::Message() : @@ -58,14 +57,14 @@ Message::swapState(Routable &rhs) Message & Message::setTimeReceivedNow() { - _timeReceived = steady_clock::now(); + _timeReceived = vespalib::steady_clock::now(); return *this; } -milliseconds +duration Message::getTimeRemainingNow() const { - return std::max(milliseconds(0), _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived)); + return std::max(0ns, _timeRemaining - (vespalib::steady_clock::now() - _timeReceived)); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index 3069e45e6d8..15e7384707c 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -3,7 +3,6 @@ #include "routable.h" #include <vespa/messagebus/routing/route.h> -#include <chrono> namespace mbus { @@ -55,7 +54,7 @@ public: * * @return The remaining time in milliseconds. */ - milliseconds getTimeRemaining() const { return _timeRemaining; } + duration getTimeRemaining() const { return _timeRemaining; } /** * Sets the numer of milliseconds that remain before this message times @@ -65,7 +64,7 @@ public: * @param timeRemaining The number of milliseconds until expiration. * @return This, to allow chaining. */ - Message &setTimeRemaining(milliseconds timeRemaining) { _timeRemaining = timeRemaining; return *this; } + Message &setTimeRemaining(duration timeRemaining) { _timeRemaining = timeRemaining; return *this; } /** * Returns the number of milliseconds that remain right now before this @@ -79,7 +78,7 @@ public: * * @return The remaining time in milliseconds. */ - milliseconds getTimeRemainingNow() const; + duration getTimeRemainingNow() const; /** * Access the route associated with this message. @@ -185,7 +184,7 @@ public: private: Route _route; time_point _timeReceived; - milliseconds _timeRemaining; + duration _timeRemaining; bool _retryEnabled; uint32_t _retry; }; diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h index 037298cf7c0..1777b9e69f5 100644 --- a/messagebus/src/vespa/messagebus/network/inetwork.h +++ b/messagebus/src/vespa/messagebus/network/inetwork.h @@ -62,7 +62,7 @@ public: * @param seconds The timeout. * @return True if ready. */ - virtual bool waitUntilReady(seconds timeout) const = 0; + virtual bool waitUntilReady(duration timeout) const = 0; /** * Register a session name with the network layer. This will make the diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 5ae6b07c3fa..0bc7f9f3399 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -225,12 +225,12 @@ RPCNetwork::start() } bool -RPCNetwork::waitUntilReady(seconds timeout) const +RPCNetwork::waitUntilReady(duration timeout) const { slobrok::api::SlobrokList brokerList; slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList); bool hasConfig = false; - for (uint32_t i = 0; i < timeout.count() * 100; ++i) { + for (int64_t i = 0; i < vespalib::count_ms(timeout)/10; ++i) { if (configurator->poll()) { hasConfig = true; } @@ -240,10 +240,10 @@ RPCNetwork::waitUntilReady(seconds timeout) const std::this_thread::sleep_for(10ms); } if (! hasConfig) { - LOG(error, "failed to get config for slobroks in %2.2f seconds", timeout.count()); + LOG(error, "failed to get config for slobroks in %2.2f seconds", vespalib::to_s(timeout)); } else if (! _mirror->ready()) { auto brokers = brokerList.logString(); - LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), timeout.count()); + LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), vespalib::to_s(timeout)); } return false; } @@ -320,7 +320,7 @@ void RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients) { SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self - seconds timeout = ctx._msg.getTimeRemainingNow(); + duration timeout = ctx._msg.getTimeRemainingNow(); for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) { RoutingNode *&recipient = ctx._recipients[i]; @@ -373,7 +373,7 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx) make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str())); } else { - std::chrono::milliseconds timeRemaining = ctx._msg.getTimeRemainingNow(); + duration timeRemaining = ctx._msg.getTimeRemainingNow(); Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg); RPCSendAdapter *adapter = getSendAdapter(ctx._version); if (adapter == nullptr) { diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 169bdd86dd9..a6c2724929d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -219,7 +219,7 @@ public: void attach(INetworkOwner &owner) override; const string getConnectionSpec() const override; bool start() override; - bool waitUntilReady(seconds timout) const override; + bool waitUntilReady(duration timout) const override; void registerSession(const string &session) override; void unregisterSession(const string &session) override; bool allocServiceAddress(RoutingNode &recipient) override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index e0fae8eabd6..2422638dc05 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -98,22 +98,22 @@ RPCSend::handleDiscard(Context ctx) } void -RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, milliseconds timeRemaining) +RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, duration timeRemaining) { send(recipient, version, FillByHandover(std::move(payload)), timeRemaining); } void -RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, milliseconds timeRemaining) +RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, duration timeRemaining) { send(recipient, version, FillByCopy(payload), timeRemaining); } void RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & payload, milliseconds timeRemaining) + const PayLoadFiller & payload, duration timeRemaining) { - SendContext::UP ctx(new SendContext(recipient, timeRemaining)); + auto ctx = std::make_unique<SendContext>(recipient, timeRemaining); RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress()); const Message &msg = recipient.getMessage(); Route route = recipient.getRoute(); @@ -126,7 +126,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.", version.toString().c_str(), _clientIdent.c_str(), - address.getServiceName().c_str(), ctx->getTimeout().count())); + address.getServiceName().c_str(), vespalib::to_s(ctx->getTimeout()))); } if (hop.getIgnoreResult()) { @@ -135,13 +135,13 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str())); } - Reply::UP reply(new EmptyReply()); + auto reply = std::make_unique<EmptyReply>(); reply->getTrace().swap(ctx->getTrace()); _net->getOwner().deliverReply(std::move(reply), recipient); } else { SendContext *ptr = ctx.release(); req->SetContext(FNET_Context(ptr)); - address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout().count(), this); + address.getTarget().getFRTTarget().InvokeAsync(req, vespalib::to_s(ptr->getTimeout()), this); } } @@ -159,12 +159,12 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) { Error error; Trace & trace = ctx->getTrace(); if (!req->CheckReturnTypes(getReturnSpec())) { - reply.reset(new EmptyReply()); + reply = std::make_unique<EmptyReply>(); switch (req->GetErrorCode()) { case FRTE_RPC_TIMEOUT: error = Error(ErrorCode::TIMEOUT, make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s", - serviceName.c_str(), ctx->getTimeout().count(), req->GetErrorMessage())); + serviceName.c_str(), vespalib::to_s(ctx->getTimeout()), req->GetErrorMessage())); break; case FRTE_RPC_CONNECTION: error = Error(ErrorCode::CONNECTION_ERROR, diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index e7bf5495974..f3a9177d236 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -44,7 +44,7 @@ public: virtual uint32_t getTraceLevel() const = 0; virtual bool useRetry() const = 0; virtual uint32_t getRetries() const = 0; - virtual milliseconds getRemainingTime() const = 0; + virtual duration getRemainingTime() const = 0; virtual vespalib::stringref getRoute() const = 0; virtual vespalib::stringref getSession() const = 0; virtual BlobRef getPayload() const = 0; @@ -59,13 +59,13 @@ protected: Error & error, vespalib::TraceNode & rootTrace) const = 0; virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const = 0; + const PayLoadFiller &filler, duration timeRemaining) const = 0; virtual const char * getReturnSpec() const = 0; virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0; virtual std::unique_ptr<Params> toParams(const FRT_Values ¶m) const = 0; void send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & filler, milliseconds timeRemaining); + const PayLoadFiller & filler, duration timeRemaining); std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version, BlobRef payload, Error & error) const; /** @@ -89,9 +89,9 @@ private: void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, milliseconds timeRemaining) final override; + Blob payload, duration timeRemaining) final override; void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, milliseconds timeRemaining) final override; + BlobRef payload, duration timeRemaining) final override; void RequestDone(FRT_RPCRequest *req) final override; void handleReply(std::unique_ptr<Reply> reply) final override; }; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h index 0b620b3b11f..0e299366e77 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h @@ -14,18 +14,18 @@ public: using UP = std::unique_ptr<SendContext>; SendContext(const SendContext &) = delete; SendContext & operator = (const SendContext &) = delete; - SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining) + SendContext(mbus::RoutingNode &recipient, duration timeRemaining) : _recipient(recipient), _trace(recipient.getTrace().getLevel()), _timeout(timeRemaining) { } mbus::RoutingNode &getRecipient() { return _recipient; } mbus::Trace &getTrace() { return _trace; } - seconds getTimeout() { return _timeout; } + duration getTimeout() { return _timeout; } private: mbus::RoutingNode &_recipient; mbus::Trace _trace; - seconds _timeout; + duration _timeout; }; /** diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h index cc89bb022b9..15b63c3117c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h @@ -43,7 +43,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, milliseconds timeRemaining) = 0; + BlobRef payload, duration timeRemaining) = 0; /** * Performs the actual sending to the given recipient. @@ -54,7 +54,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, milliseconds timeRemaining) = 0; + Blob payload, duration timeRemaining) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp index e902aa20965..388ab3309c4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp @@ -4,7 +4,6 @@ #include "rpcnetwork.h" #include "rpcserviceaddress.h" #include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/tracelevel.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/frt/reflection.h> @@ -59,7 +58,7 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder) void RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const + const PayLoadFiller &filler, duration timeRemaining) const { FRT_Values &args = *req.GetParams(); @@ -69,7 +68,7 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, args.AddString(address.getSessionName().c_str()); args.AddInt8(msg.getRetryEnabled() ? 1 : 0); args.AddInt32(msg.getRetry()); - args.AddInt64(timeRemaining.count()); + args.AddInt64(vespalib::count_ms(timeRemaining)); args.AddString(msg.getProtocol().c_str()); filler.fill(args); args.AddInt32(traceLevel); @@ -85,7 +84,7 @@ public: uint32_t getTraceLevel() const override { return _args[8]._intval32; } bool useRetry() const override { return _args[3]._intval8 != 0; } uint32_t getRetries() const override { return _args[4]._intval32; } - milliseconds getRemainingTime() const override { return milliseconds(_args[5]._intval64); } + duration getRemainingTime() const override { return std::chrono::milliseconds(_args[5]._intval64); } vespalib::Version getVersion() const override { return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len)); @@ -135,7 +134,7 @@ RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error reply = decode(protocolName, version, payload, error); } if ( ! reply ) { - reply.reset(new EmptyReply()); + reply = std::make_unique<EmptyReply>(); } reply->setRetryDelay(retryDelay); for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) { diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 3265c304830..249acc50e0c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const override; + const PayLoadFiller &filler, duration timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp index 3e453bb60eb..3b0c10500b9 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp @@ -101,7 +101,7 @@ private: void RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const + const PayLoadFiller &filler, duration timeRemaining) const { FRT_Values &args = *req.GetParams(); req.SetMethodName(METHOD_NAME); @@ -118,7 +118,7 @@ RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Rout root.setString(SESSION_F, address.getSessionName()); root.setBool(USERETRY_F, msg.getRetryEnabled()); root.setLong(RETRY_F, msg.getRetry()); - root.setLong(TIMELEFT_F, timeRemaining.count()); + root.setLong(TIMELEFT_F, vespalib::count_ms(timeRemaining)); root.setString(PROTOCOL_F, msg.getProtocol()); root.setLong(TRACELEVEL_F, traceLevel); filler.fill(BLOB_F, root); @@ -156,7 +156,7 @@ public: uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); } bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); } uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); } - milliseconds getRemainingTime() const override { return milliseconds(_slime.get()[TIMELEFT_F].asLong()); } + duration getRemainingTime() const override { return std::chrono::milliseconds(_slime.get()[TIMELEFT_F].asLong()); } Version getVersion() const override { return Version(_slime.get()[VERSION_F].asString().make_stringref()); diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h index 939c37c81b2..c48aa90a9fb 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const override; + const PayLoadFiller &filler, duration timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index bb3bf1d172d..63470b6b707 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -23,7 +23,7 @@ RPCTarget::~RPCTarget() } void -RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler) +RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) { bool hasVersion = false; bool shouldInvoke = false; @@ -47,7 +47,7 @@ RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler) } else if (shouldInvoke) { FRT_RPCRequest *req = _orb.AllocRPCRequest(); req->SetMethodName("mbus.getVersion"); - _target.InvokeAsync(req, timeout.count(), this); + _target.InvokeAsync(req, vespalib::to_s(timeout), this); } } diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index 9c089381de7..b6488f25cb7 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -88,7 +88,7 @@ public: * @param timeout The timeout for the request in milliseconds. * @param handler The handler to be called once the version is available. */ - void resolveVersion(seconds timeout, IVersionHandler &handler); + void resolveVersion(duration timeout, IVersionHandler &handler); /** * @return true if the FRT target is valid or has been invoked (which diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp index eb2d93c6688..121056044ba 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.cpp +++ b/messagebus/src/vespa/messagebus/routablequeue.cpp @@ -39,17 +39,17 @@ RoutableQueue::enqueue(Routable::UP r) } Routable::UP -RoutableQueue::dequeue(uint32_t msTimeout) +RoutableQueue::dequeue(duration msTimeout) { steady_clock::time_point startTime = steady_clock::now(); - uint64_t msLeft = msTimeout; + duration msLeft = msTimeout; vespalib::MonitorGuard guard(_monitor); - while (_queue.size() == 0 && msLeft > 0) { + while (_queue.size() == 0 && msLeft > duration::zero()) { if (!guard.wait(msLeft) || _queue.size() > 0) { break; } - uint64_t elapsed = duration_cast<milliseconds>(steady_clock::now() - startTime).count(); - msLeft = (elapsed > msTimeout) ? 0 : msTimeout - elapsed; + duration elapsed = (steady_clock::now() - startTime); + msLeft = (elapsed > msTimeout) ? duration::zero() : msTimeout - elapsed; } if (_queue.size() == 0) { return Routable::UP(); diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h index c0ed35dece8..153686b1669 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.h +++ b/messagebus/src/vespa/messagebus/routablequeue.h @@ -69,7 +69,8 @@ public: * @return the dequeued routable * @param msTimeout how long to wait if the queue is empty **/ - Routable::UP dequeue(uint32_t msTimeout); + Routable::UP dequeue(duration timeout); + Routable::UP dequeue() { return dequeue(duration::zero());} /** * Handle a Message by enqueuing it. diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 41fe01625ae..eece44a922a 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -9,8 +9,6 @@ using vespalib::make_string; -using namespace std::chrono_literals; -using namespace std::chrono; namespace mbus { @@ -75,7 +73,7 @@ SourceSession::send(Message::UP msg) { msg->setTimeReceivedNow(); if (msg->getTimeRemaining() == 0ms) { - msg->setTimeRemaining(duration_cast<milliseconds>(_timeout)); + msg->setTimeRemaining(_timeout); } { vespalib::MonitorGuard guard(_monitor); @@ -145,10 +143,10 @@ SourceSession::close() } SourceSession & -SourceSession::setTimeout(double timeout) +SourceSession::setTimeout(duration timeout) { vespalib::MonitorGuard guard(_monitor); - _timeout = seconds(timeout); + _timeout = timeout; return *this; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index 31ebec3555e..0992a3e377b 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -27,7 +27,7 @@ private: Sequencer _sequencer; IReplyHandler &_replyHandler; IThrottlePolicy::SP _throttlePolicy; - seconds _timeout; + duration _timeout; uint32_t _pendingCount; bool _closed; bool _done; @@ -120,7 +120,7 @@ public: * @param timeout The numer of seconds allowed. * @return This, to allow chaining. */ - SourceSession &setTimeout(double timeout); + SourceSession &setTimeout(duration timeout); }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp index e7a99f2a1de..5b0c920f138 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp @@ -7,8 +7,8 @@ namespace mbus { SourceSessionParams::SourceSessionParams() : _replyHandler(nullptr), - _throttlePolicy(new DynamicThrottlePolicy()), - _timeout(180.0) + _throttlePolicy(std::make_shared<DynamicThrottlePolicy>()), + _timeout(180s) { } IThrottlePolicy::SP @@ -20,18 +20,12 @@ SourceSessionParams::getThrottlePolicy() const SourceSessionParams & SourceSessionParams::setThrottlePolicy(IThrottlePolicy::SP throttlePolicy) { - _throttlePolicy = throttlePolicy; + _throttlePolicy = std::move(throttlePolicy); return *this; } -seconds -SourceSessionParams::getTimeout() const -{ - return _timeout; -} - SourceSessionParams & -SourceSessionParams::setTimeout(seconds timeout) +SourceSessionParams::setTimeout(duration timeout) { _timeout = timeout; return *this; diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h index 9ee17280d40..588b13a1bd8 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.h +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h @@ -3,7 +3,6 @@ #include "ireplyhandler.h" #include "ithrottlepolicy.h" -#include <chrono> namespace mbus { @@ -18,7 +17,7 @@ class SourceSessionParams { private: IReplyHandler *_replyHandler; IThrottlePolicy::SP _throttlePolicy; - seconds _timeout; + duration _timeout; public: /** @@ -46,14 +45,14 @@ public: * * @return The total timeout parameter. */ - seconds getTimeout() const; + duration getTimeout() const { return _timeout; } /** * Returns the number of seconds a message can spend trying to succeed. * * @return The timeout in seconds. */ - SourceSessionParams &setTimeout(seconds timeout); + SourceSessionParams &setTimeout(duration timeout); /** * Returns whether or not a reply handler has been assigned to this. diff --git a/messagebus/src/vespa/messagebus/steadytimer.cpp b/messagebus/src/vespa/messagebus/steadytimer.cpp index f64c8f361cd..942a1f4f051 100644 --- a/messagebus/src/vespa/messagebus/steadytimer.cpp +++ b/messagebus/src/vespa/messagebus/steadytimer.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "steadytimer.h" -#include <chrono> +#include <vespa/vespalib/util/time.h> using namespace std::chrono; @@ -9,7 +9,7 @@ namespace mbus { uint64_t SteadyTimer::getMilliTime() const { - return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count(); + return vespalib::count_ms(steady_clock::now().time_since_epoch()); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp index f98a4be05c3..01d644bba09 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp +++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp @@ -26,14 +26,13 @@ Receptor::handleReply(Reply::UP reply) } Message::UP -Receptor::getMessage(double maxWait) +Receptor::getMessage(duration maxWait) { - int64_t ms = (int64_t)(maxWait * 1000); steady_clock::time_point startTime = steady_clock::now(); vespalib::MonitorGuard guard(_mon); while (_msg.get() == 0) { - int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count(); - if (w <= 0 || !guard.wait(w)) { + duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); + if (w <= duration::zero() || !guard.wait(w)) { break; } } @@ -41,14 +40,13 @@ Receptor::getMessage(double maxWait) } Reply::UP -Receptor::getReply(double maxWait) +Receptor::getReply(duration maxWait) { - int64_t ms = (int)(maxWait * 1000); steady_clock::time_point startTime = steady_clock::now(); vespalib::MonitorGuard guard(_mon); while (_reply.get() == 0) { - int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count(); - if (w <= 0 || !guard.wait(w)) { + duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); + if (w <= duration::zero() || !guard.wait(w)) { break; } } diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h index ffc637fa90d..1d98ac62cd2 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.h +++ b/messagebus/src/vespa/messagebus/testlib/receptor.h @@ -25,8 +25,10 @@ public: ~Receptor(); void handleMessage(Message::UP msg) override; void handleReply(Reply::UP reply) override; - Message::UP getMessage(double maxWait = 120.0); - Reply::UP getReply(double maxWait = 120.0); + Message::UP getMessage(duration maxWait = 120s); + Reply::UP getReply(duration maxWait = 120s); + Message::UP getMessageNow() { return getMessage(duration::zero()); } + Reply::UP getReplyNow() { return getReply(duration::zero()); } }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp index bbd23d52c0b..fe5baab3e40 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp +++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp @@ -4,6 +4,7 @@ #include "slobrok.h" #include "slobrokstate.h" #include <vespa/vespalib/component/vtag.h> +#include <thread> namespace mbus { |