diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-12-03 21:45:53 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-12-04 18:26:06 +0000 |
commit | b8e151a435ccec1ecc03d98bac5b59f4f14514be (patch) | |
tree | 140efda301a7e5adc407c44061ba5b0bb41dd212 | |
parent | 7700f411ea6f4a3e7c0599fae239ec84c18c0038 (diff) |
timeout as duration
Conflicts:
messagebus/src/vespa/messagebus/testlib/testserver.cpp
84 files changed, 559 insertions, 596 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 93c5d51fef5..3dbc9dd7e69 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -92,8 +92,10 @@ public: TEST_APPHOOK(Test); -Test::Test() {} -Test::~Test() {} +Test::Test() = default; +Test::~Test() = default; + +const vespalib::duration TIMEOUT = 600s; int Test::Main() { @@ -230,7 +232,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600)); + ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT)); } EXPECT_EQUAL(servers.size(), lst.size()); for (uint32_t i = 0; i < servers.size(); ++i) { @@ -332,14 +334,14 @@ Test::testExternSend() msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); - ASSERT_TRUE((msg = ir.getMessage(600))); + ASSERT_TRUE((msg = ir.getMessage(TIMEOUT))); is->forward(std::move(msg)); - ASSERT_TRUE((msg = dr.getMessage(600))); + ASSERT_TRUE((msg = dr.getMessage(TIMEOUT))); ds->acknowledge(std::move(msg)); - mbus::Reply::UP reply = ir.getReply(600); + mbus::Reply::UP reply = ir.getReply(TIMEOUT); ASSERT_TRUE(reply); is->forward(std::move(reply)); - ASSERT_TRUE((reply = sr.getReply(600))); + ASSERT_TRUE((reply = sr.getReply(TIMEOUT))); fprintf(stderr, "%s", reply->getTrace().toString().c_str()); } @@ -366,9 +368,9 @@ Test::testExternMultipleSlobroks() mbus::Message::UP msg(new GetDocumentMessage(DocumentId("id:ns:testdoc::"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); - ASSERT_TRUE((msg = dr.getMessage(600))); + ASSERT_TRUE((msg = dr.getMessage(TIMEOUT))); ds->acknowledge(std::move(msg)); - mbus::Reply::UP reply = sr.getReply(600); + mbus::Reply::UP reply = sr.getReply(TIMEOUT); ASSERT_TRUE(reply); } { @@ -382,9 +384,9 @@ Test::testExternMultipleSlobroks() mbus::Message::UP msg(new GetDocumentMessage(DocumentId("id:ns:testdoc::"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); - ASSERT_TRUE((msg = dr.getMessage(600))); + ASSERT_TRUE((msg = dr.getMessage(TIMEOUT))); ds->acknowledge(std::move(msg)); - mbus::Reply::UP reply = sr.getReply(600); + mbus::Reply::UP reply = sr.getReply(TIMEOUT); ASSERT_TRUE(reply); } } @@ -410,7 +412,7 @@ Test::testLocalService() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600)); + ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT)); } EXPECT_EQUAL(10u, lst.size()); @@ -423,7 +425,7 @@ Test::testLocalService() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600)); + ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT)); } EXPECT_EQUAL(1u, lst.size()); EXPECT_EQUAL("docproc/cluster.default/*/chain.default", *lst.begin()); @@ -466,8 +468,8 @@ Test::testLocalServiceCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600)); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); + ASSERT_TRUE(barFrame.getReceptor().getReply(TIMEOUT)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(TIMEOUT)); } void @@ -543,8 +545,8 @@ Test::testRoundRobinCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600)); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); + ASSERT_TRUE(barFrame.getReceptor().getReply(TIMEOUT)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(TIMEOUT)); } void @@ -573,7 +575,7 @@ Test::multipleGetRepliesAreMergedToFoundDocument() mbus::Reply::UP reply(new GetDocumentReply(std::move(doc))); selected[i]->handleReply(std::move(reply)); } - mbus::Reply::UP reply = frame.getReceptor().getReply(600); + mbus::Reply::UP reply = frame.getReceptor().getReply(TIMEOUT); EXPECT_TRUE(reply); EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), reply->getType()); EXPECT_EQUAL(123456ULL, static_cast<GetDocumentReply&>(*reply).getLastModified()); @@ -647,7 +649,7 @@ Test::testDocumentRouteSelectorIgnore() make_shared<Document>(*_docType, DocumentId("id:yarn:testdoc:n=1234:fluff")))); std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 0)); - mbus::Reply::UP reply = frame.getReceptor().getReply(600); + mbus::Reply::UP reply = frame.getReceptor().getReply(TIMEOUT); ASSERT_TRUE(reply); EXPECT_EQUAL(uint32_t(DocumentProtocol::REPLY_DOCUMENTIGNORED), reply->getType()); EXPECT_EQUAL(0u, reply->getNumErrors()); @@ -940,7 +942,7 @@ Test::testSubsetService() ASSERT_TRUE(frame.select(leaf, 1)); lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600)); + ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT)); } ASSERT_TRUE(lst.size() > 1); // must have requeried @@ -959,7 +961,7 @@ Test::testSubsetService() prev = next; leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600)); + ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT)); } // Test requerying for dropping nodes. @@ -976,7 +978,7 @@ Test::testSubsetService() mbus::Reply::UP reply(new mbus::EmptyReply()); reply->addError(mbus::Error(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, route)); leaf[0]->handleReply(std::move(reply)); - ASSERT_TRUE(frame.getReceptor().getReply(600)); + ASSERT_TRUE(frame.getReceptor().getReply(TIMEOUT)); } EXPECT_EQUAL(10u, lst.size()); @@ -1018,8 +1020,8 @@ Test::testSubsetServiceCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600)); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); + ASSERT_TRUE(barFrame.getReceptor().getReply(TIMEOUT)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(TIMEOUT)); } bool @@ -1034,7 +1036,7 @@ Test::trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> } else { frame.select(leaf, 0); } - if( ! frame.getReceptor().getReply(600)) { + if( ! frame.getReceptor().getReply(TIMEOUT)) { LOG(error, "Reply failed to propagate to reply handler."); return false; } diff --git a/documentapi/src/tests/policies/testframe.cpp b/documentapi/src/tests/policies/testframe.cpp index 4cdc5d4ba14..9834e534a56 100644 --- a/documentapi/src/tests/policies/testframe.cpp +++ b/documentapi/src/tests/policies/testframe.cpp @@ -122,7 +122,7 @@ TestFrame::testSelect(const std::vector<string> &expected) } node->handleReply(std::make_unique<mbus::EmptyReply>()); } - if (_handler.getReply(600).get() == nullptr) { + if (_handler.getReply(600s).get() == nullptr) { LOG(error, "Reply not propagated to handler."); return false; } @@ -166,7 +166,7 @@ TestFrame::testMerge(const ReplyMap &replies, node->handleReply(std::move(ret)); } - mbus::Reply::UP reply = _handler.getReply(600); + mbus::Reply::UP reply = _handler.getReply(600s); if (reply.get() == nullptr) { LOG(error, "Reply not propagated to handler."); return false; diff --git a/documentapi/src/tests/policyfactory/policyfactory.cpp b/documentapi/src/tests/policyfactory/policyfactory.cpp index 877ade22e2a..729818c5c4a 100644 --- a/documentapi/src/tests/policyfactory/policyfactory.cpp +++ b/documentapi/src/tests/policyfactory/policyfactory.cpp @@ -73,6 +73,8 @@ createMessage() TEST_SETUP(Test); +const vespalib::duration TIMEOUT = 600s; + int Test::Main() { @@ -89,7 +91,7 @@ Test::Main() mbus::Route route = mbus::Route::parse("[MyPolicy]"); ASSERT_TRUE(src->send(createMessage(), route).isAccepted()); - mbus::Reply::UP reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(600); + mbus::Reply::UP reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(TIMEOUT); ASSERT_TRUE(reply); fprintf(stderr, "%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); @@ -101,7 +103,7 @@ Test::Main() protocol->putRoutingPolicyFactory("MyPolicy", std::make_shared<MyFactory>()); ASSERT_TRUE(src->send(createMessage(), route).isAccepted()); - reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(600); + reply = static_cast<mbus::Receptor&>(src->getReplyHandler()).getReply(TIMEOUT); ASSERT_TRUE(reply); fprintf(stderr, "%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); diff --git a/documentapi/src/tests/routablefactory/routablefactory.cpp b/documentapi/src/tests/routablefactory/routablefactory.cpp index 3f94d120d66..32a36ef0b59 100644 --- a/documentapi/src/tests/routablefactory/routablefactory.cpp +++ b/documentapi/src/tests/routablefactory/routablefactory.cpp @@ -167,6 +167,8 @@ Test::Main() // /////////////////////////////////////////////////////////////////////////////// +const vespalib::duration TIMEOUT = 600s; + void Test::testFactory(TestData &data) { @@ -174,8 +176,8 @@ Test::testFactory(TestData &data) // Source should fail to encode the message. EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted()); - mbus::Reply::UP reply = data._srcHandler.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + mbus::Reply::UP reply = data._srcHandler.getReply(TIMEOUT); + ASSERT_TRUE(reply); fprintf(stderr, "%s\n", reply->getTrace().toString().c_str()); ASSERT_TRUE(reply->hasErrors()); EXPECT_EQUAL((uint32_t)mbus::ErrorCode::ENCODE_ERROR, reply->getError(0).getCode()); @@ -185,8 +187,8 @@ Test::testFactory(TestData &data) data._srcProtocol->putRoutableFactory(MyMessage::TYPE, IRoutableFactory::SP(new MyMessageFactory()), vespalib::VersionSpecification()); EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted()); - reply = data._srcHandler.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + reply = data._srcHandler.getReply(TIMEOUT); + ASSERT_TRUE(reply); fprintf(stderr, "%s\n", reply->getTrace().toString().c_str()); EXPECT_TRUE(reply->hasErrors()); EXPECT_EQUAL((uint32_t)mbus::ErrorCode::DECODE_ERROR, reply->getError(0).getCode()); @@ -196,13 +198,13 @@ Test::testFactory(TestData &data) data._dstProtocol->putRoutableFactory(MyMessage::TYPE, IRoutableFactory::SP(new MyMessageFactory()), vespalib::VersionSpecification()); EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted()); - mbus::Message::UP msg = data._dstHandler.getMessage(600); - ASSERT_TRUE(msg.get() != NULL); + mbus::Message::UP msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg); reply.reset(new MyReply()); reply->swapState(*msg); data._dstSession->reply(std::move(reply)); - reply = data._srcHandler.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + reply = data._srcHandler.getReply(TIMEOUT); + ASSERT_TRUE(reply); fprintf(stderr, "%s\n", reply->getTrace().toString().c_str()); EXPECT_TRUE(reply->hasErrors()); EXPECT_EQUAL((uint32_t)mbus::ErrorCode::ENCODE_ERROR, reply->getError(0).getCode()); @@ -212,13 +214,13 @@ Test::testFactory(TestData &data) data._dstProtocol->putRoutableFactory(MyReply::TYPE, IRoutableFactory::SP(new MyReplyFactory()), vespalib::VersionSpecification()); EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted()); - msg = data._dstHandler.getMessage(600); - ASSERT_TRUE(msg.get() != NULL); + msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg); reply.reset(new MyReply()); reply->swapState(*msg); data._dstSession->reply(std::move(reply)); - reply = data._srcHandler.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + reply = data._srcHandler.getReply(TIMEOUT); + ASSERT_TRUE(reply); fprintf(stderr, "%s\n", reply->getTrace().toString().c_str()); EXPECT_TRUE(reply->hasErrors()); EXPECT_EQUAL((uint32_t)mbus::ErrorCode::DECODE_ERROR, reply->getError(0).getCode()); @@ -228,13 +230,13 @@ Test::testFactory(TestData &data) data._srcProtocol->putRoutableFactory(MyReply::TYPE, IRoutableFactory::SP(new MyReplyFactory()), vespalib::VersionSpecification()); EXPECT_TRUE(data._srcSession->send(mbus::Message::UP(new MyMessage()), route).isAccepted()); - msg = data._dstHandler.getMessage(600); - ASSERT_TRUE(msg.get() != NULL); + msg = data._dstHandler.getMessage(TIMEOUT); + ASSERT_TRUE(msg); reply.reset(new MyReply()); reply->swapState(*msg); data._dstSession->reply(std::move(reply)); - reply = data._srcHandler.getReply(600); - ASSERT_TRUE(reply.get() != NULL); + reply = data._srcHandler.getReply(TIMEOUT); + ASSERT_TRUE(reply); fprintf(stderr, "%s\n", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp index b18653d272d..f1557b8a305 100644 --- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp +++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp @@ -115,7 +115,7 @@ Test::Main() void Test::testAdvanced(TestData &data) { - const double TIMEOUT = 60; + const duration TIMEOUT = 60s; IProtocol::SP protocol(new SimpleProtocol()); SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol); simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE))); @@ -130,41 +130,41 @@ Test::testAdvanced(TestData &data) // Initial send. Message::UP msg = data._fooHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._fooSession->acknowledge(std::move(msg)); msg = data._barHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "bar")); data._barSession->reply(std::move(reply)); msg = data._bazHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "baz1")); data._bazSession->reply(std::move(reply)); // First retry. - msg = data._fooHandler.getMessage(0); + msg = data._fooHandler.getMessage(duration::zero()); ASSERT_TRUE(msg.get() == NULL); msg = data._barHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._barSession->acknowledge(std::move(msg)); msg = data._bazHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "baz2")); data._bazSession->reply(std::move(reply)); // Second retry. - msg = data._fooHandler.getMessage(0); + msg = data._fooHandler.getMessage(duration::zero()); ASSERT_TRUE(msg.get() == NULL); - msg = data._barHandler.getMessage(0); + msg = data._barHandler.getMessage(duration::zero()); ASSERT_TRUE(msg.get() == NULL); msg = data._bazHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::FATAL_ERROR, "baz3")); @@ -172,7 +172,7 @@ Test::testAdvanced(TestData &data) // Done. reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::FATAL_ERROR, reply->getError(0).getCode()); diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp index 105da2b3bd1..e10d87ed2ba 100644 --- a/messagebus/src/tests/choke/choke.cpp +++ b/messagebus/src/tests/choke/choke.cpp @@ -71,13 +71,13 @@ TestData::start() _srcSession = _srcServer.mb.createSourceSession(SourceSessionParams() .setThrottlePolicy(IThrottlePolicy::SP()) .setReplyHandler(_srcHandler)); - if (_srcSession.get() == NULL) { + if ( ! _srcSession) { return false; } _dstSession = _dstServer.mb.createDestinationSession(DestinationSessionParams() .setName("session") .setMessageHandler(_dstHandler)); - if (_dstSession.get() == NULL) { + if ( ! _dstSession) { return false; } if (!_srcServer.waitSlobrok("dst/session", 1u)) { @@ -108,7 +108,7 @@ Test::Main() TEST_DONE(); } -static const double TIMEOUT = 120; +static const duration TIMEOUT = 120s; //////////////////////////////////////////////////////////////////////////////// // @@ -131,11 +131,11 @@ Test::testMaxCount(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); if (i < max) { Message::UP msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } else { Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode()); } @@ -146,14 +146,14 @@ Test::testMaxCount(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); msg = reply->getMessage(); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } while (!lst.empty()) { @@ -163,7 +163,7 @@ Test::testMaxCount(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } EXPECT_EQUAL(0u, data._dstServer.mb.getPendingCount()); @@ -185,11 +185,11 @@ Test::testMaxSize(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); if (i < max) { Message::UP msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } else { Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::SESSION_BUSY, reply->getError(0).getCode()); } @@ -200,14 +200,14 @@ Test::testMaxSize(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); msg = reply->getMessage(); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); msg = data._dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); lst.push_back(msg.release()); } while (!lst.empty()) { @@ -217,7 +217,7 @@ Test::testMaxSize(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } EXPECT_EQUAL(0u, data._dstServer.mb.getPendingSize()); diff --git a/messagebus/src/tests/context/context.cpp b/messagebus/src/tests/context/context.cpp index de9dd1b83a6..39a7ca7b467 100644 --- a/messagebus/src/tests/context/context.cpp +++ b/messagebus/src/tests/context/context.cpp @@ -1,17 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/messagebus/destinationsession.h> -#include <vespa/messagebus/intermediatesession.h> #include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/routablequeue.h> #include <vespa/messagebus/routing/routingspec.h> #include <vespa/messagebus/sourcesession.h> #include <vespa/messagebus/sourcesessionparams.h> -#include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simplereply.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> #include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/testkit/testapp.h> @@ -24,7 +20,7 @@ struct Handler : public IMessageHandler Handler(MessageBus &mb) : session() { session = mb.createDestinationSession("session", true, *this); } - ~Handler() { + ~Handler() override { session.reset(); } void handleMessage(Message::UP msg) override { @@ -81,18 +77,18 @@ Test::Main() } EXPECT_EQUAL(queue.size(), 3u); { - Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); - ASSERT_TRUE(reply.get() != 0); + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release()); + ASSERT_TRUE(reply); EXPECT_EQUAL(reply->getContext().value.UINT64, 10u); } { - Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); - ASSERT_TRUE(reply.get() != 0); + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release()); + ASSERT_TRUE(reply); EXPECT_EQUAL(reply->getContext().value.UINT64, 20u); } { - Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); - ASSERT_TRUE(reply.get() != 0); + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(duration::zero()).release()); + ASSERT_TRUE(reply); EXPECT_EQUAL(reply->getContext().value.UINT64, 30u); } TEST_DONE(); diff --git a/messagebus/src/tests/messagebus/messagebus.cpp b/messagebus/src/tests/messagebus/messagebus.cpp index 7434941a900..cf249e6eaec 100644 --- a/messagebus/src/tests/messagebus/messagebus.cpp +++ b/messagebus/src/tests/messagebus/messagebus.cpp @@ -21,7 +21,7 @@ struct Base { Base() : queue() {} virtual ~Base() { while (queue.size() > 0) { - Routable::UP r = queue.dequeue(0); + Routable::UP r = queue.dequeue(); r->getCallStack().discard(); } } @@ -219,8 +219,8 @@ Test::testSendToAny() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP msg = p->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = p->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); reply->addError(Error(ErrorCode::FATAL_ERROR, "")); @@ -229,8 +229,8 @@ Test::testSendToAny() } EXPECT_TRUE(client->waitQueueSize(300)); while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(0); - ASSERT_TRUE(reply.get() != 0); + Routable::UP reply = client->queue.dequeue(); + ASSERT_TRUE(reply); ASSERT_TRUE(reply->isReply()); EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 1); } @@ -262,8 +262,8 @@ Test::testSendToCol() for (uint32_t i = 0; i < searchVec.size(); ++i) { Search *s = searchVec[i]; while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = s->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); s->session->reply(std::move(reply)); @@ -273,8 +273,8 @@ Test::testSendToCol() FastOS_Thread::Sleep(100); client->waitQueueSize(300); while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(0); - ASSERT_TRUE(reply.get() != 0); + Routable::UP reply = client->queue.dequeue(); + ASSERT_TRUE(reply); ASSERT_TRUE(reply->isReply()); EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0); } @@ -296,8 +296,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -316,8 +316,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -328,8 +328,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < searchVec.size(); ++i) { Search *s = searchVec[i]; while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = s->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); s->session->reply(std::move(reply)); @@ -341,8 +341,8 @@ Test::testSendToAnyThenCol() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -350,8 +350,8 @@ Test::testSendToAnyThenCol() FastOS_Thread::Sleep(100); client->waitQueueSize(300); while (client->queue.size() > 0) { - Routable::UP reply = client->queue.dequeue(0); - ASSERT_TRUE(reply.get() != 0); + Routable::UP reply = client->queue.dequeue(); + ASSERT_TRUE(reply); ASSERT_TRUE(reply->isReply()); EXPECT_TRUE(static_cast<Reply&>(*reply).getNumErrors() == 0); } @@ -423,8 +423,8 @@ void Test::assertDst(Search& dst) { ASSERT_TRUE(dst.waitQueueSize(1)); - Routable::UP msg = dst.queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = dst.queue.dequeue(); + ASSERT_TRUE(msg); dst.session->acknowledge(Message::UP(static_cast<Message*>(msg.release()))); } @@ -432,8 +432,8 @@ void Test::assertItr(DocProc& itr) { ASSERT_TRUE(itr.waitQueueSize(1)); - Routable::UP msg = itr.queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = itr.queue.dequeue(); + ASSERT_TRUE(msg); itr.session->forward(std::move(msg)); } @@ -441,8 +441,8 @@ void Test::assertSrc(Client& src) { ASSERT_TRUE(src.waitQueueSize(1)); - Routable::UP msg = src.queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = src.queue.dequeue(); + ASSERT_TRUE(msg); } void @@ -485,8 +485,8 @@ Test::debugTrace() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } @@ -497,8 +497,8 @@ Test::debugTrace() for (uint32_t i = 0; i < searchVec.size(); ++i) { Search *s = searchVec[i]; while (s->queue.size() > 0) { - Routable::UP msg = s->queue.dequeue(0); - ASSERT_TRUE(msg.get() != 0); + Routable::UP msg = s->queue.dequeue(); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); msg->swapState(*reply); s->session->reply(std::move(reply)); @@ -510,21 +510,21 @@ Test::debugTrace() for (uint32_t i = 0; i < dpVec.size(); ++i) { DocProc *p = dpVec[i]; while (p->queue.size() > 0) { - Routable::UP r = p->queue.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = p->queue.dequeue(); + ASSERT_TRUE(r); p->session->forward(std::move(r)); } } client->waitQueueSize(3); - Routable::UP reply = client->queue.dequeue(0); + Routable::UP reply = client->queue.dequeue(); fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", reply->getTrace().getLevel(), reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(0); + reply = client->queue.dequeue(); fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", reply->getTrace().getLevel(), reply->getTrace().toString().c_str()); - reply = client->queue.dequeue(0); + reply = client->queue.dequeue(); fprintf(stderr, "\nTRACE DUMP(level=%d):\n%s\n\n", reply->getTrace().getLevel(), reply->getTrace().toString().c_str()); diff --git a/messagebus/src/tests/resender/resender.cpp b/messagebus/src/tests/resender/resender.cpp index cd7fdbeb6cc..f40dbdc5e4a 100644 --- a/messagebus/src/tests/resender/resender.cpp +++ b/messagebus/src/tests/resender/resender.cpp @@ -3,9 +3,7 @@ #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/messagebus.h> -#include <vespa/messagebus/routing/errordirective.h> #include <vespa/messagebus/routing/retrytransienterrorspolicy.h> -#include <vespa/messagebus/testlib/custompolicy.h> #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> @@ -32,7 +30,7 @@ StringList::add(const string &str) std::vector<string>::push_back(str); return *this; } -static const double GET_MESSAGE_TIMEOUT = 60.0; +static const duration GET_MESSAGE_TIMEOUT = 60s; //////////////////////////////////////////////////////////////////////////////// // @@ -158,20 +156,20 @@ Test::testRetryTag(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); for (uint32_t i = 0; i < 5; ++i) { EXPECT_EQUAL(i, msg->getRetry()); EXPECT_EQUAL(true, msg->getRetryEnabled()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -183,14 +181,14 @@ Test::testRetryEnabledTag(TestData &data) msg->setRetryEnabled(false); EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("dst/session")).isAccepted()); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_EQUAL(false, msg->getRetryEnabled()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -200,16 +198,16 @@ Test::testTransientError(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -219,13 +217,13 @@ Test::testFatalError(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -235,14 +233,14 @@ Test::testDisableRetry(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasErrors()); EXPECT_TRUE(!reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); printf("%s", reply->getTrace().toString().c_str()); } @@ -253,19 +251,19 @@ Test::testRetryDelay(TestData &data) data._retryPolicy->setBaseDelay(0.01); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); for (uint32_t i = 0; i < 5; ++i) { EXPECT_EQUAL(i, msg->getRetry()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, -1); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); string trace = reply->getTrace().toString(); EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos); @@ -282,19 +280,19 @@ Test::testRequestRetryDelay(TestData &data) data._retryPolicy->setBaseDelay(1); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); for (uint32_t i = 0; i < 5; ++i) { EXPECT_EQUAL(i, msg->getRetry()); replyFromDestination(data, std::move(msg), ErrorCode::APP_TRANSIENT_ERROR, i / 50.0); msg = data._dstHandler.getMessage(GET_MESSAGE_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } replyFromDestination(data, std::move(msg), ErrorCode::APP_FATAL_ERROR, 0); Reply::UP reply = data._srcHandler.getReply(); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(reply->hasFatalErrors()); - msg = data._dstHandler.getMessage(0); - EXPECT_TRUE(msg.get() == NULL); + msg = data._dstHandler.getMessageNow(); + EXPECT_FALSE(msg); string trace = reply->getTrace().toString(); EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos); diff --git a/messagebus/src/tests/routable/routable.cpp b/messagebus/src/tests/routable/routable.cpp index a7a35508656..88db25fa9c7 100644 --- a/messagebus/src/tests/routable/routable.cpp +++ b/messagebus/src/tests/routable/routable.cpp @@ -73,7 +73,7 @@ Test::Main() msg.pushHandler(handler); msg.discard(); - Reply::UP reply = handler.getReply(0); + Reply::UP reply = handler.getReply(duration::zero()); ASSERT_FALSE(reply); } { @@ -86,7 +86,7 @@ Test::Main() reply.swapState(msg); reply.discard(); - Reply::UP ap = handler.getReply(0); + Reply::UP ap = handler.getReply(duration::zero()); ASSERT_FALSE(ap); } diff --git a/messagebus/src/tests/routablequeue/routablequeue.cpp b/messagebus/src/tests/routablequeue/routablequeue.cpp index 4e7c918134f..04e90984f88 100644 --- a/messagebus/src/tests/routablequeue/routablequeue.cpp +++ b/messagebus/src/tests/routablequeue/routablequeue.cpp @@ -40,8 +40,8 @@ Test::Main() { RoutableQueue rq; EXPECT_TRUE(rq.size() == 0); - EXPECT_TRUE(rq.dequeue(0).get() == 0); - EXPECT_TRUE(rq.dequeue(100).get() == 0); + EXPECT_TRUE(rq.dequeue().get() == 0); + EXPECT_TRUE(rq.dequeue(100ms).get() == 0); EXPECT_TRUE(TestMessage::getCnt() == 0); EXPECT_TRUE(TestReply::getCnt() == 0); rq.enqueue(Routable::UP(new TestMessage(101))); @@ -61,16 +61,16 @@ Test::Main() EXPECT_TRUE(TestMessage::getCnt() == 2); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 3); EXPECT_TRUE(r->getType() == 101); } EXPECT_TRUE(TestMessage::getCnt() == 1); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 2); EXPECT_TRUE(r->getType() == 201); } @@ -85,16 +85,16 @@ Test::Main() EXPECT_TRUE(TestMessage::getCnt() == 2); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 3); EXPECT_TRUE(r->getType() == 102); } EXPECT_TRUE(TestMessage::getCnt() == 1); EXPECT_TRUE(TestReply::getCnt() == 2); { - Routable::UP r = rq.dequeue(0); - ASSERT_TRUE(r.get() != 0); + Routable::UP r = rq.dequeue(); + ASSERT_TRUE(r); EXPECT_TRUE(rq.size() == 2); EXPECT_TRUE(r->getType() == 202); } diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp index 42c5938fe92..48c86a53160 100644 --- a/messagebus/src/tests/routing/routing.cpp +++ b/messagebus/src/tests/routing/routing.cpp @@ -8,7 +8,6 @@ #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> -#include <vespa/messagebus/testlib/simplereply.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/component/vtag.h> @@ -485,7 +484,7 @@ private: bool testTrace(TestData &data, const std::vector<string> &expected); bool testTrace(const std::vector<string> &expected, const Trace &trace); - static const double RECEPTOR_TIMEOUT; + static const duration RECEPTOR_TIMEOUT; public: int Main() override; @@ -540,7 +539,7 @@ public: void requireThatDepthLimitCanBeIgnored(TestData &data); }; -const double Test::RECEPTOR_TIMEOUT = 120.0; +const duration Test::RECEPTOR_TIMEOUT = 120s; TEST_APPHOOK(Test); @@ -613,7 +612,7 @@ bool Test::testAcknowledge(TestData &data) { Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - if (!EXPECT_TRUE(msg.get() != NULL)) { + if (!EXPECT_TRUE(msg)) { return false; } data._dstSession->acknowledge(std::move(msg)); @@ -632,7 +631,7 @@ bool Test::testTrace(TestData &data, const std::vector<string> &expected) { Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - if (!EXPECT_TRUE(reply.get() != NULL)) { + if (!EXPECT_TRUE(reply)) { return false; } if (!EXPECT_TRUE(!reply->hasErrors())) { @@ -747,7 +746,7 @@ Test::testNoRoutingTable(TestData &data) EXPECT_TRUE(!res.isAccepted()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode()); Message::UP msg = res.getMessage(); - EXPECT_TRUE(msg.get() != NULL); + EXPECT_TRUE(msg); } void @@ -759,7 +758,7 @@ Test::testUnknownRoute(TestData &data) EXPECT_TRUE(!res.isAccepted()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode()); Message::UP msg = res.getMessage(); - EXPECT_TRUE(msg.get() != NULL); + EXPECT_TRUE(msg); } void @@ -767,7 +766,7 @@ Test::testNoRoute(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route()).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -779,10 +778,10 @@ Test::testRecognizeHopName(TestData &data) .addHop(HopSpec("dst", "dst/session")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -794,10 +793,10 @@ Test::testRecognizeRouteDirective(TestData &data) .addHop(HopSpec("dir", "route:dst")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dir")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -808,10 +807,10 @@ Test::testRecognizeRouteName(TestData &data) .addRoute(RouteSpec("dst").addHop("dst/session")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -823,7 +822,7 @@ Test::testHopResolutionOverflow(TestData &data) .addHop(HopSpec("bar", "foo")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -835,7 +834,7 @@ Test::testRouteResolutionOverflow(TestData &data) .addRoute(RouteSpec("foo").addHop("route:foo")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), "foo").isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -847,13 +846,13 @@ Test::testInsertRoute(TestData &data) .addRoute(RouteSpec("foo").addHop("dst/session").addHop("bar")))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("route:foo baz")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); EXPECT_EQUAL(2u, msg->getRoute().getNumHops()); EXPECT_EQUAL("bar", msg->getRoute().getHop(0).toString()); EXPECT_EQUAL("baz", msg->getRoute().getHop(1).toString()); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -864,7 +863,7 @@ Test::testErrorDirective(TestData &data) route.getHop(0).setDirective(1, IHopDirective::SP(new ErrorDirective("err"))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); EXPECT_EQUAL("err", reply->getError(0).getMessage()); @@ -879,7 +878,7 @@ Test::testSelectError(TestData &data) data._srcServer.mb.putProtocol(protocol); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom: ]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); LOG(info, "testSelectError trace=%s", reply->getTrace().toString().c_str()); LOG(info, "testSelectError error=%s", reply->getError(0).toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); @@ -895,7 +894,7 @@ Test::testSelectNone(TestData &data) data._srcServer.mb.putProtocol(protocol); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_SERVICES_FOR_ROUTE, reply->getError(0).getCode()); } @@ -909,10 +908,10 @@ Test::testSelectOne(TestData &data) data._srcServer.mb.putProtocol(protocol); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -922,22 +921,22 @@ Test::testResend1(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err2")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("[APP_TRANSIENT_ERROR @ localhost]: err1") @@ -957,22 +956,22 @@ Test::testResend2(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); reply.reset(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err2")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Source session accepted a 3 byte message. 1 message(s) now pending.") @@ -1022,13 +1021,13 @@ Test::testNoResend(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err1")); data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_TRANSIENT_ERROR, reply->getError(0).getCode()); } @@ -1043,16 +1042,16 @@ Test::testSelectOnResend(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Selecting { 'dst/session' }.") @@ -1075,16 +1074,16 @@ Test::testNoSelectOnResend(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "err")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Selecting { 'dst/session' }.") @@ -1107,10 +1106,10 @@ Test::testCanConsumeError(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/session,dst/unknown]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); EXPECT_TRUE(testTrace(StringList() @@ -1131,7 +1130,7 @@ Test::testCantConsumeError(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:dst/unknown]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); @@ -1152,10 +1151,10 @@ Test::testNestedPolicies(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); } @@ -1173,10 +1172,10 @@ Test::testRemoveReply(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("[NO_ADDRESS_FOR_SERVICE @ localhost]") @@ -1197,10 +1196,10 @@ Test::testSetReply(TestData &data) data._retryPolicy->setEnabled(false); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Select:[SetReply:foo],dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL("foo", reply->getError(0).getMessage()); @@ -1221,16 +1220,16 @@ Test::testResendSetAndReuseReply(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[ReuseReply:[SetReply:foo],dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_TRANSIENT_ERROR, "dst")); data._dstSession->reply(std::move(reply)); msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); } @@ -1250,10 +1249,10 @@ Test::testResendSetAndRemoveReply(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[RemoveReply:[SetReply:foo],dst/session]")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL("foo", reply->getError(0).getMessage()); @@ -1271,13 +1270,13 @@ Test::testHopIgnoresReply(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("?dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "dst")); data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Not waiting for a reply from 'dst/session'."), @@ -1291,13 +1290,13 @@ Test::testHopBlueprintIgnoresReply(TestData &data) .addHop(HopSpec("foo", "dst/session").setIgnoreResult(true)))); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Reply::UP reply(new EmptyReply()); reply->swapState(*msg); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "dst")); data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Not waiting for a reply from 'dst/session'."), @@ -1309,12 +1308,12 @@ Test::testAcceptEmptyRoute(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/session")).isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); const Route &route = msg->getRoute(); EXPECT_EQUAL(0u, route.getNumHops()); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); } void @@ -1333,7 +1332,7 @@ Test::testAbortOnlyActiveNodes(TestData &data) data._retryPolicy->setEnabled(true); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL((uint32_t)ErrorCode::SEND_ABORTED, reply->getError(1).getCode()); @@ -1344,7 +1343,7 @@ Test::testUnknownPolicy(TestData &data) { EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Unknown]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::UNKNOWN_POLICY, reply->getError(0).getCode()); } @@ -1362,7 +1361,7 @@ Test::testSelectException(TestData &data) Route::parse("[SelectException]")) .isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR, reply->getError(0).getCode()); @@ -1383,10 +1382,10 @@ Test::testMergeException(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route) .isAccepted()); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR, reply->getError(0).getCode()); @@ -1423,7 +1422,7 @@ Test::requireThatIgnoreFlagIsSerializedWithMessage(TestData &data) { ASSERT_TRUE(testSend(data, "dst/session foo ?bar")); Message::UP msg = data._dstHandler.getMessage(RECEPTOR_TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); Route route = msg->getRoute(); EXPECT_EQUAL(2u, route.getNumHops()); Hop hop = route.getHop(0); @@ -1533,10 +1532,10 @@ Test::testTimeout(TestData &data) { data._retryPolicy->setEnabled(true); data._retryPolicy->setBaseDelay(0.01); - data._srcSession->setTimeout(0.5); + data._srcSession->setTimeout(500ms); EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/unknown")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(1).getCode()); diff --git a/messagebus/src/tests/routingcontext/routingcontext.cpp b/messagebus/src/tests/routingcontext/routingcontext.cpp index fa0d8ed6536..1c971b29ee3 100644 --- a/messagebus/src/tests/routingcontext/routingcontext.cpp +++ b/messagebus/src/tests/routingcontext/routingcontext.cpp @@ -23,7 +23,7 @@ using namespace mbus; using vespalib::make_string; -static const double TIMEOUT = 120; +static const duration TIMEOUT = 120s; class StringList : public std::vector<string> { public: diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp index c4fc0f908e0..aed4de7b228 100644 --- a/messagebus/src/tests/sendadapter/sendadapter.cpp +++ b/messagebus/src/tests/sendadapter/sendadapter.cpp @@ -59,7 +59,7 @@ public: bool start(); }; -static const int TIMEOUT_SECS = 6; +static const duration TIMEOUT_SECS = 6s; TestData::TestData() : _slobrok(), @@ -121,7 +121,7 @@ testVersionedSend(TestData &data, return false; } msg = data._itrHandler.getMessage(TIMEOUT_SECS); - if (!EXPECT_TRUE(msg.get() != NULL)) { + if (!EXPECT_TRUE(msg)) { return false; } LOG(info, "Message version %s serialized at source.", @@ -138,7 +138,7 @@ testVersionedSend(TestData &data, } data._itrSession->forward(std::move(msg)); msg = data._dstHandler.getMessage(TIMEOUT_SECS); - if (!EXPECT_TRUE(msg.get() != NULL)) { + if (!EXPECT_TRUE(msg)) { return false; } LOG(info, "Message version %s serialized at intermediate.", @@ -157,7 +157,7 @@ testVersionedSend(TestData &data, reply->swapState(*msg); data._dstSession->reply(std::move(reply)); reply = data._itrHandler.getReply(); - if (!EXPECT_TRUE(reply.get() != NULL)) { + if (!EXPECT_TRUE(reply)) { return false; } LOG(info, "Reply version %s serialized at destination.", @@ -173,7 +173,7 @@ testVersionedSend(TestData &data, } data._itrSession->forward(std::move(reply)); reply = data._srcHandler.getReply(); - if (!EXPECT_TRUE(reply.get() != NULL)) { + if (!EXPECT_TRUE(reply)) { return false; } LOG(info, "Reply version %s serialized at intermediate.", diff --git a/messagebus/src/tests/sequencer/sequencer.cpp b/messagebus/src/tests/sequencer/sequencer.cpp index 218c1e43929..d347c3855cb 100644 --- a/messagebus/src/tests/sequencer/sequencer.cpp +++ b/messagebus/src/tests/sequencer/sequencer.cpp @@ -21,7 +21,7 @@ struct MyQueue : public RoutableQueue { virtual ~MyQueue() { while (size() > 0) { - Routable::UP obj = dequeue(0); + Routable::UP obj = dequeue(); obj->getCallStack().discard(); } } @@ -31,14 +31,14 @@ struct MyQueue : public RoutableQueue { LOG(error, "checkReply(): No reply in queue."); return false; } - Routable::UP obj = dequeue(0); + Routable::UP obj = dequeue(); if (!obj->isReply()) { LOG(error, "checkReply(): Got message when expecting reply."); return false; } Reply::UP reply(static_cast<Reply*>(obj.release())); Message::UP msg = reply->getMessage(); - if (msg.get() == NULL) { + if ( ! msg) { LOG(error, "checkReply(): Reply has no message attached."); return false; } @@ -64,7 +64,7 @@ struct MyQueue : public RoutableQueue { } void replyNext() { - Routable::UP obj = dequeue(0); + Routable::UP obj = dequeue(); Message::UP msg(static_cast<Message*>(obj.release())); Reply::UP reply(new EmptyReply()); diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp index 1706da3b55f..e415622707f 100644 --- a/messagebus/src/tests/shutdown/shutdown.cpp +++ b/messagebus/src/tests/shutdown/shutdown.cpp @@ -30,7 +30,7 @@ public: } }; -static const double TIMEOUT = 120; +static const duration TIMEOUT = 120s; TEST_APPHOOK(Test); @@ -77,11 +77,11 @@ Test::requireThatShutdownOnSourceWithPendingIsSafe() SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams() .setThrottlePolicy(IThrottlePolicy::SP()) .setReplyHandler(srcHandler)); - ASSERT_TRUE(srcSession.get() != NULL); + ASSERT_TRUE(srcSession); ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1)); ASSERT_TRUE(srcSession->send(std::move(msg), "dst/session", true).isAccepted()); msg = dstHandler.getMessage(TIMEOUT); - ASSERT_TRUE(msg.get() != NULL); + ASSERT_TRUE(msg); } dstSession->acknowledge(std::move(msg)); } diff --git a/messagebus/src/tests/sourcesession/sourcesession.cpp b/messagebus/src/tests/sourcesession/sourcesession.cpp index 5177cf0e799..c793dd435c8 100644 --- a/messagebus/src/tests/sourcesession/sourcesession.cpp +++ b/messagebus/src/tests/sourcesession/sourcesession.cpp @@ -102,11 +102,11 @@ Test::testSequencing() FastOS_Thread::Sleep(250); EXPECT_TRUE(waitQueueSize(dstQ, 2)); EXPECT_TRUE(waitQueueSize(srcQ, 0)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); EXPECT_TRUE(waitQueueSize(srcQ, 2)); EXPECT_TRUE(waitQueueSize(dstQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 3)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); } @@ -137,7 +137,7 @@ Test::testResendError() } EXPECT_TRUE(waitQueueSize(dstQ, 1)); { - Routable::UP r = dstQ.dequeue(0); + Routable::UP r = dstQ.dequeue(); Reply::UP reply(new EmptyReply()); r->swapState(*reply); reply->addError(Error(ErrorCode::FATAL_ERROR, "error")); @@ -153,7 +153,7 @@ Test::testResendError() } EXPECT_TRUE(waitQueueSize(dstQ, 1)); { - Routable::UP r = dstQ.dequeue(0); + Routable::UP r = dstQ.dequeue(); Reply::UP reply(new EmptyReply()); r->swapState(*reply); reply->addError(Error(ErrorCode::TRANSIENT_ERROR, "error")); @@ -161,12 +161,12 @@ Test::testResendError() } EXPECT_TRUE(waitQueueSize(dstQ, 1)); EXPECT_TRUE(waitQueueSize(srcQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 2)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); { - string trace1 = srcQ.dequeue(0)->getTrace().toString(); - string trace2 = srcQ.dequeue(0)->getTrace().toString(); + string trace1 = srcQ.dequeue()->getTrace().toString(); + string trace2 = srcQ.dequeue()->getTrace().toString(); fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace1.c_str()); fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace2.c_str()); } @@ -202,7 +202,7 @@ Test::testResendConnDown() msg->getTrace().setLevel(9); EXPECT_TRUE(ss->send(std::move(msg), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dst2Q, 1)); - Routable::UP obj = dst2Q.dequeue(0); + Routable::UP obj = dst2Q.dequeue(); obj->discard(); src.mb.setupRouting(RoutingSpec().addTable(RoutingTableSpec(SimpleProtocol::NAME) .addHop(HopSpec("dst", "dst/session")))); @@ -210,11 +210,11 @@ Test::testResendConnDown() ASSERT_TRUE(waitQueueSize(dstQ, 1)); // fails ASSERT_TRUE(waitQueueSize(srcQ, 0)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); - string trace = srcQ.dequeue(0)->getTrace().toString(); + string trace = srcQ.dequeue()->getTrace().toString(); fprintf(stderr, "\nTRACE DUMP:\n%s\n\n", trace.c_str()); } @@ -240,7 +240,7 @@ Test::testIllegalRoute() ASSERT_TRUE(waitQueueSize(srcQ, 1)); { while (srcQ.size() > 0) { - Routable::UP routable = srcQ.dequeue(0); + Routable::UP routable = srcQ.dequeue(); ASSERT_TRUE(routable->isReply()); Reply::UP r(static_cast<Reply*>(routable.release())); EXPECT_EQUAL(1u, r->getNumErrors()); @@ -272,7 +272,7 @@ Test::testNoServices() ASSERT_TRUE(waitQueueSize(srcQ, 1)); { while (srcQ.size() > 0) { - Routable::UP routable = srcQ.dequeue(0); + Routable::UP routable = srcQ.dequeue(); ASSERT_TRUE(routable->isReply()); Reply::UP r(static_cast<Reply*>(routable.release())); EXPECT_TRUE(r->getNumErrors() == 1); @@ -300,7 +300,7 @@ Test::testBlockingClose() EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("foo")), "dst").isAccepted()); ss->close(); srcQ.handleMessage(Message::UP(new SimpleMessage("bogus"))); - Routable::UP routable = srcQ.dequeue(0); + Routable::UP routable = srcQ.dequeue(); EXPECT_TRUE(routable->isReply()); } diff --git a/messagebus/src/tests/throttling/throttling.cpp b/messagebus/src/tests/throttling/throttling.cpp index 5d3525e8ba6..6599604bf9a 100644 --- a/messagebus/src/tests/throttling/throttling.cpp +++ b/messagebus/src/tests/throttling/throttling.cpp @@ -3,7 +3,6 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/messagebus/destinationsession.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/routablequeue.h> #include <vespa/messagebus/routing/retrytransienterrorspolicy.h> #include <vespa/messagebus/routing/routingspec.h> @@ -12,7 +11,6 @@ #include <vespa/messagebus/staticthrottlepolicy.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/simplemessage.h> -#include <vespa/messagebus/testlib/simpleprotocol.h> #include <vespa/messagebus/testlib/simplereply.h> #include <vespa/messagebus/testlib/testserver.h> @@ -141,15 +139,15 @@ Test::testMaxPendingCount() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 5)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 5)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 3)); EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); @@ -158,11 +156,11 @@ Test::testMaxPendingCount() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1234567890")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 5)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 8)); ASSERT_TRUE(waitQueueSize(dstQ, 0)); } @@ -202,17 +200,17 @@ Test::testMaxPendingSize() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 2)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted()); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 2)); EXPECT_TRUE(ss->send(Message::UP(new SimpleMessage("12")), "dst").isAccepted()); EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("1")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 3)); } @@ -244,7 +242,7 @@ Test::testMinOne() EXPECT_TRUE(!ss->send(Message::UP(new SimpleMessage("")), "dst").isAccepted()); EXPECT_TRUE(waitQueueSize(dstQ, 1)); - ds->acknowledge(Message::UP((Message*)dstQ.dequeue(0).release())); + ds->acknowledge(Message::UP((Message*)dstQ.dequeue().release())); ASSERT_TRUE(waitQueueSize(srcQ, 1)); EXPECT_TRUE(waitQueueSize(dstQ, 0)); } diff --git a/messagebus/src/tests/timeout/timeout.cpp b/messagebus/src/tests/timeout/timeout.cpp index b2631e13d9c..980681a9d42 100644 --- a/messagebus/src/tests/timeout/timeout.cpp +++ b/messagebus/src/tests/timeout/timeout.cpp @@ -76,7 +76,7 @@ Test::testMessageExpires() EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode()); - Message::UP msg = dstHandler.getMessage(1); + Message::UP msg = dstHandler.getMessage(1s); if (msg) { msg->discard(); } diff --git a/messagebus/src/vespa/messagebus/common.h b/messagebus/src/vespa/messagebus/common.h index df25bf17973..11594fcdc22 100644 --- a/messagebus/src/vespa/messagebus/common.h +++ b/messagebus/src/vespa/messagebus/common.h @@ -2,17 +2,14 @@ #pragma once #include <vespa/vespalib/stllike/string.h> -#include <chrono> +#include <vespa/vespalib/util/time.h> namespace mbus { // Decide the type of string used once using string = vespalib::string; - -using seconds = std::chrono::duration<double>; -using milliseconds = std::chrono::milliseconds; - - +using duration = vespalib::duration; +using time_point = vespalib::steady_clock::time_point; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.cpp b/messagebus/src/vespa/messagebus/message.cpp index c9fb03dd013..305ffb06aa0 100644 --- a/messagebus/src/vespa/messagebus/message.cpp +++ b/messagebus/src/vespa/messagebus/message.cpp @@ -10,7 +10,6 @@ #include <vespa/log/log.h> LOG_SETUP(".message"); -using namespace std::chrono; namespace mbus { Message::Message() : @@ -58,14 +57,14 @@ Message::swapState(Routable &rhs) Message & Message::setTimeReceivedNow() { - _timeReceived = steady_clock::now(); + _timeReceived = vespalib::steady_clock::now(); return *this; } -milliseconds +duration Message::getTimeRemainingNow() const { - return std::max(milliseconds(0), _timeRemaining - duration_cast<milliseconds>(steady_clock::now() - _timeReceived)); + return std::max(0ns, _timeRemaining - (vespalib::steady_clock::now() - _timeReceived)); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/message.h b/messagebus/src/vespa/messagebus/message.h index 3069e45e6d8..15e7384707c 100644 --- a/messagebus/src/vespa/messagebus/message.h +++ b/messagebus/src/vespa/messagebus/message.h @@ -3,7 +3,6 @@ #include "routable.h" #include <vespa/messagebus/routing/route.h> -#include <chrono> namespace mbus { @@ -55,7 +54,7 @@ public: * * @return The remaining time in milliseconds. */ - milliseconds getTimeRemaining() const { return _timeRemaining; } + duration getTimeRemaining() const { return _timeRemaining; } /** * Sets the numer of milliseconds that remain before this message times @@ -65,7 +64,7 @@ public: * @param timeRemaining The number of milliseconds until expiration. * @return This, to allow chaining. */ - Message &setTimeRemaining(milliseconds timeRemaining) { _timeRemaining = timeRemaining; return *this; } + Message &setTimeRemaining(duration timeRemaining) { _timeRemaining = timeRemaining; return *this; } /** * Returns the number of milliseconds that remain right now before this @@ -79,7 +78,7 @@ public: * * @return The remaining time in milliseconds. */ - milliseconds getTimeRemainingNow() const; + duration getTimeRemainingNow() const; /** * Access the route associated with this message. @@ -185,7 +184,7 @@ public: private: Route _route; time_point _timeReceived; - milliseconds _timeRemaining; + duration _timeRemaining; bool _retryEnabled; uint32_t _retry; }; diff --git a/messagebus/src/vespa/messagebus/network/inetwork.h b/messagebus/src/vespa/messagebus/network/inetwork.h index 037298cf7c0..1777b9e69f5 100644 --- a/messagebus/src/vespa/messagebus/network/inetwork.h +++ b/messagebus/src/vespa/messagebus/network/inetwork.h @@ -62,7 +62,7 @@ public: * @param seconds The timeout. * @return True if ready. */ - virtual bool waitUntilReady(seconds timeout) const = 0; + virtual bool waitUntilReady(duration timeout) const = 0; /** * Register a session name with the network layer. This will make the diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 5ae6b07c3fa..0bc7f9f3399 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -225,12 +225,12 @@ RPCNetwork::start() } bool -RPCNetwork::waitUntilReady(seconds timeout) const +RPCNetwork::waitUntilReady(duration timeout) const { slobrok::api::SlobrokList brokerList; slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList); bool hasConfig = false; - for (uint32_t i = 0; i < timeout.count() * 100; ++i) { + for (int64_t i = 0; i < vespalib::count_ms(timeout)/10; ++i) { if (configurator->poll()) { hasConfig = true; } @@ -240,10 +240,10 @@ RPCNetwork::waitUntilReady(seconds timeout) const std::this_thread::sleep_for(10ms); } if (! hasConfig) { - LOG(error, "failed to get config for slobroks in %2.2f seconds", timeout.count()); + LOG(error, "failed to get config for slobroks in %2.2f seconds", vespalib::to_s(timeout)); } else if (! _mirror->ready()) { auto brokers = brokerList.logString(); - LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), timeout.count()); + LOG(error, "mirror (of %s) failed to become ready in %2.2f seconds", brokers.c_str(), vespalib::to_s(timeout)); } return false; } @@ -320,7 +320,7 @@ void RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients) { SendContext &ctx = *(new SendContext(*this, msg, recipients)); // deletes self - seconds timeout = ctx._msg.getTimeRemainingNow(); + duration timeout = ctx._msg.getTimeRemainingNow(); for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) { RoutingNode *&recipient = ctx._recipients[i]; @@ -373,7 +373,7 @@ RPCNetwork::send(RPCNetwork::SendContext &ctx) make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str())); } else { - std::chrono::milliseconds timeRemaining = ctx._msg.getTimeRemainingNow(); + duration timeRemaining = ctx._msg.getTimeRemainingNow(); Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg); RPCSendAdapter *adapter = getSendAdapter(ctx._version); if (adapter == nullptr) { diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 169bdd86dd9..a6c2724929d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -219,7 +219,7 @@ public: void attach(INetworkOwner &owner) override; const string getConnectionSpec() const override; bool start() override; - bool waitUntilReady(seconds timout) const override; + bool waitUntilReady(duration timout) const override; void registerSession(const string &session) override; void unregisterSession(const string &session) override; bool allocServiceAddress(RoutingNode &recipient) override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index e0fae8eabd6..2422638dc05 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -98,22 +98,22 @@ RPCSend::handleDiscard(Context ctx) } void -RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, milliseconds timeRemaining) +RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, duration timeRemaining) { send(recipient, version, FillByHandover(std::move(payload)), timeRemaining); } void -RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, milliseconds timeRemaining) +RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, duration timeRemaining) { send(recipient, version, FillByCopy(payload), timeRemaining); } void RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & payload, milliseconds timeRemaining) + const PayLoadFiller & payload, duration timeRemaining) { - SendContext::UP ctx(new SendContext(recipient, timeRemaining)); + auto ctx = std::make_unique<SendContext>(recipient, timeRemaining); RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress()); const Message &msg = recipient.getMessage(); Route route = recipient.getRoute(); @@ -126,7 +126,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.", version.toString().c_str(), _clientIdent.c_str(), - address.getServiceName().c_str(), ctx->getTimeout().count())); + address.getServiceName().c_str(), vespalib::to_s(ctx->getTimeout()))); } if (hop.getIgnoreResult()) { @@ -135,13 +135,13 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str())); } - Reply::UP reply(new EmptyReply()); + auto reply = std::make_unique<EmptyReply>(); reply->getTrace().swap(ctx->getTrace()); _net->getOwner().deliverReply(std::move(reply), recipient); } else { SendContext *ptr = ctx.release(); req->SetContext(FNET_Context(ptr)); - address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout().count(), this); + address.getTarget().getFRTTarget().InvokeAsync(req, vespalib::to_s(ptr->getTimeout()), this); } } @@ -159,12 +159,12 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) { Error error; Trace & trace = ctx->getTrace(); if (!req->CheckReturnTypes(getReturnSpec())) { - reply.reset(new EmptyReply()); + reply = std::make_unique<EmptyReply>(); switch (req->GetErrorCode()) { case FRTE_RPC_TIMEOUT: error = Error(ErrorCode::TIMEOUT, make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s", - serviceName.c_str(), ctx->getTimeout().count(), req->GetErrorMessage())); + serviceName.c_str(), vespalib::to_s(ctx->getTimeout()), req->GetErrorMessage())); break; case FRTE_RPC_CONNECTION: error = Error(ErrorCode::CONNECTION_ERROR, diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index e7bf5495974..f3a9177d236 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -44,7 +44,7 @@ public: virtual uint32_t getTraceLevel() const = 0; virtual bool useRetry() const = 0; virtual uint32_t getRetries() const = 0; - virtual milliseconds getRemainingTime() const = 0; + virtual duration getRemainingTime() const = 0; virtual vespalib::stringref getRoute() const = 0; virtual vespalib::stringref getSession() const = 0; virtual BlobRef getPayload() const = 0; @@ -59,13 +59,13 @@ protected: Error & error, vespalib::TraceNode & rootTrace) const = 0; virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const = 0; + const PayLoadFiller &filler, duration timeRemaining) const = 0; virtual const char * getReturnSpec() const = 0; virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0; virtual std::unique_ptr<Params> toParams(const FRT_Values ¶m) const = 0; void send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & filler, milliseconds timeRemaining); + const PayLoadFiller & filler, duration timeRemaining); std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version, BlobRef payload, Error & error) const; /** @@ -89,9 +89,9 @@ private: void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, milliseconds timeRemaining) final override; + Blob payload, duration timeRemaining) final override; void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, milliseconds timeRemaining) final override; + BlobRef payload, duration timeRemaining) final override; void RequestDone(FRT_RPCRequest *req) final override; void handleReply(std::unique_ptr<Reply> reply) final override; }; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h index 0b620b3b11f..0e299366e77 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h @@ -14,18 +14,18 @@ public: using UP = std::unique_ptr<SendContext>; SendContext(const SendContext &) = delete; SendContext & operator = (const SendContext &) = delete; - SendContext(mbus::RoutingNode &recipient, milliseconds timeRemaining) + SendContext(mbus::RoutingNode &recipient, duration timeRemaining) : _recipient(recipient), _trace(recipient.getTrace().getLevel()), _timeout(timeRemaining) { } mbus::RoutingNode &getRecipient() { return _recipient; } mbus::Trace &getTrace() { return _trace; } - seconds getTimeout() { return _timeout; } + duration getTimeout() { return _timeout; } private: mbus::RoutingNode &_recipient; mbus::Trace _trace; - seconds _timeout; + duration _timeout; }; /** diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h index cc89bb022b9..15b63c3117c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h @@ -43,7 +43,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, milliseconds timeRemaining) = 0; + BlobRef payload, duration timeRemaining) = 0; /** * Performs the actual sending to the given recipient. @@ -54,7 +54,7 @@ public: * @param timeRemaining The time remaining until the message expires. */ virtual void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, milliseconds timeRemaining) = 0; + Blob payload, duration timeRemaining) = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp index e902aa20965..388ab3309c4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp @@ -4,7 +4,6 @@ #include "rpcnetwork.h" #include "rpcserviceaddress.h" #include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/tracelevel.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/frt/reflection.h> @@ -59,7 +58,7 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder) void RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const + const PayLoadFiller &filler, duration timeRemaining) const { FRT_Values &args = *req.GetParams(); @@ -69,7 +68,7 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, args.AddString(address.getSessionName().c_str()); args.AddInt8(msg.getRetryEnabled() ? 1 : 0); args.AddInt32(msg.getRetry()); - args.AddInt64(timeRemaining.count()); + args.AddInt64(vespalib::count_ms(timeRemaining)); args.AddString(msg.getProtocol().c_str()); filler.fill(args); args.AddInt32(traceLevel); @@ -85,7 +84,7 @@ public: uint32_t getTraceLevel() const override { return _args[8]._intval32; } bool useRetry() const override { return _args[3]._intval8 != 0; } uint32_t getRetries() const override { return _args[4]._intval32; } - milliseconds getRemainingTime() const override { return milliseconds(_args[5]._intval64); } + duration getRemainingTime() const override { return std::chrono::milliseconds(_args[5]._intval64); } vespalib::Version getVersion() const override { return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len)); @@ -135,7 +134,7 @@ RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error reply = decode(protocolName, version, payload, error); } if ( ! reply ) { - reply.reset(new EmptyReply()); + reply = std::make_unique<EmptyReply>(); } reply->setRetryDelay(retryDelay); for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) { diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 3265c304830..249acc50e0c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const override; + const PayLoadFiller &filler, duration timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp index 3e453bb60eb..3b0c10500b9 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp @@ -101,7 +101,7 @@ private: void RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const + const PayLoadFiller &filler, duration timeRemaining) const { FRT_Values &args = *req.GetParams(); req.SetMethodName(METHOD_NAME); @@ -118,7 +118,7 @@ RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Rout root.setString(SESSION_F, address.getSessionName()); root.setBool(USERETRY_F, msg.getRetryEnabled()); root.setLong(RETRY_F, msg.getRetry()); - root.setLong(TIMELEFT_F, timeRemaining.count()); + root.setLong(TIMELEFT_F, vespalib::count_ms(timeRemaining)); root.setString(PROTOCOL_F, msg.getProtocol()); root.setLong(TRACELEVEL_F, traceLevel); filler.fill(BLOB_F, root); @@ -156,7 +156,7 @@ public: uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); } bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); } uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); } - milliseconds getRemainingTime() const override { return milliseconds(_slime.get()[TIMELEFT_F].asLong()); } + duration getRemainingTime() const override { return std::chrono::milliseconds(_slime.get()[TIMELEFT_F].asLong()); } Version getVersion() const override { return Version(_slime.get()[VERSION_F].asString().make_stringref()); diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h index 939c37c81b2..c48aa90a9fb 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h @@ -14,7 +14,7 @@ private: std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, milliseconds timeRemaining) const override; + const PayLoadFiller &filler, duration timeRemaining) const override; std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const override; diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index bb3bf1d172d..63470b6b707 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -23,7 +23,7 @@ RPCTarget::~RPCTarget() } void -RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler) +RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) { bool hasVersion = false; bool shouldInvoke = false; @@ -47,7 +47,7 @@ RPCTarget::resolveVersion(seconds timeout, RPCTarget::IVersionHandler &handler) } else if (shouldInvoke) { FRT_RPCRequest *req = _orb.AllocRPCRequest(); req->SetMethodName("mbus.getVersion"); - _target.InvokeAsync(req, timeout.count(), this); + _target.InvokeAsync(req, vespalib::to_s(timeout), this); } } diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index 9c089381de7..b6488f25cb7 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -88,7 +88,7 @@ public: * @param timeout The timeout for the request in milliseconds. * @param handler The handler to be called once the version is available. */ - void resolveVersion(seconds timeout, IVersionHandler &handler); + void resolveVersion(duration timeout, IVersionHandler &handler); /** * @return true if the FRT target is valid or has been invoked (which diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp index eb2d93c6688..121056044ba 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.cpp +++ b/messagebus/src/vespa/messagebus/routablequeue.cpp @@ -39,17 +39,17 @@ RoutableQueue::enqueue(Routable::UP r) } Routable::UP -RoutableQueue::dequeue(uint32_t msTimeout) +RoutableQueue::dequeue(duration msTimeout) { steady_clock::time_point startTime = steady_clock::now(); - uint64_t msLeft = msTimeout; + duration msLeft = msTimeout; vespalib::MonitorGuard guard(_monitor); - while (_queue.size() == 0 && msLeft > 0) { + while (_queue.size() == 0 && msLeft > duration::zero()) { if (!guard.wait(msLeft) || _queue.size() > 0) { break; } - uint64_t elapsed = duration_cast<milliseconds>(steady_clock::now() - startTime).count(); - msLeft = (elapsed > msTimeout) ? 0 : msTimeout - elapsed; + duration elapsed = (steady_clock::now() - startTime); + msLeft = (elapsed > msTimeout) ? duration::zero() : msTimeout - elapsed; } if (_queue.size() == 0) { return Routable::UP(); diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h index c0ed35dece8..153686b1669 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.h +++ b/messagebus/src/vespa/messagebus/routablequeue.h @@ -69,7 +69,8 @@ public: * @return the dequeued routable * @param msTimeout how long to wait if the queue is empty **/ - Routable::UP dequeue(uint32_t msTimeout); + Routable::UP dequeue(duration timeout); + Routable::UP dequeue() { return dequeue(duration::zero());} /** * Handle a Message by enqueuing it. diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 41fe01625ae..eece44a922a 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -9,8 +9,6 @@ using vespalib::make_string; -using namespace std::chrono_literals; -using namespace std::chrono; namespace mbus { @@ -75,7 +73,7 @@ SourceSession::send(Message::UP msg) { msg->setTimeReceivedNow(); if (msg->getTimeRemaining() == 0ms) { - msg->setTimeRemaining(duration_cast<milliseconds>(_timeout)); + msg->setTimeRemaining(_timeout); } { vespalib::MonitorGuard guard(_monitor); @@ -145,10 +143,10 @@ SourceSession::close() } SourceSession & -SourceSession::setTimeout(double timeout) +SourceSession::setTimeout(duration timeout) { vespalib::MonitorGuard guard(_monitor); - _timeout = seconds(timeout); + _timeout = timeout; return *this; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index 31ebec3555e..0992a3e377b 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -27,7 +27,7 @@ private: Sequencer _sequencer; IReplyHandler &_replyHandler; IThrottlePolicy::SP _throttlePolicy; - seconds _timeout; + duration _timeout; uint32_t _pendingCount; bool _closed; bool _done; @@ -120,7 +120,7 @@ public: * @param timeout The numer of seconds allowed. * @return This, to allow chaining. */ - SourceSession &setTimeout(double timeout); + SourceSession &setTimeout(duration timeout); }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp index e7a99f2a1de..5b0c920f138 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp @@ -7,8 +7,8 @@ namespace mbus { SourceSessionParams::SourceSessionParams() : _replyHandler(nullptr), - _throttlePolicy(new DynamicThrottlePolicy()), - _timeout(180.0) + _throttlePolicy(std::make_shared<DynamicThrottlePolicy>()), + _timeout(180s) { } IThrottlePolicy::SP @@ -20,18 +20,12 @@ SourceSessionParams::getThrottlePolicy() const SourceSessionParams & SourceSessionParams::setThrottlePolicy(IThrottlePolicy::SP throttlePolicy) { - _throttlePolicy = throttlePolicy; + _throttlePolicy = std::move(throttlePolicy); return *this; } -seconds -SourceSessionParams::getTimeout() const -{ - return _timeout; -} - SourceSessionParams & -SourceSessionParams::setTimeout(seconds timeout) +SourceSessionParams::setTimeout(duration timeout) { _timeout = timeout; return *this; diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.h b/messagebus/src/vespa/messagebus/sourcesessionparams.h index 9ee17280d40..588b13a1bd8 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.h +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.h @@ -3,7 +3,6 @@ #include "ireplyhandler.h" #include "ithrottlepolicy.h" -#include <chrono> namespace mbus { @@ -18,7 +17,7 @@ class SourceSessionParams { private: IReplyHandler *_replyHandler; IThrottlePolicy::SP _throttlePolicy; - seconds _timeout; + duration _timeout; public: /** @@ -46,14 +45,14 @@ public: * * @return The total timeout parameter. */ - seconds getTimeout() const; + duration getTimeout() const { return _timeout; } /** * Returns the number of seconds a message can spend trying to succeed. * * @return The timeout in seconds. */ - SourceSessionParams &setTimeout(seconds timeout); + SourceSessionParams &setTimeout(duration timeout); /** * Returns whether or not a reply handler has been assigned to this. diff --git a/messagebus/src/vespa/messagebus/steadytimer.cpp b/messagebus/src/vespa/messagebus/steadytimer.cpp index f64c8f361cd..942a1f4f051 100644 --- a/messagebus/src/vespa/messagebus/steadytimer.cpp +++ b/messagebus/src/vespa/messagebus/steadytimer.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "steadytimer.h" -#include <chrono> +#include <vespa/vespalib/util/time.h> using namespace std::chrono; @@ -9,7 +9,7 @@ namespace mbus { uint64_t SteadyTimer::getMilliTime() const { - return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count(); + return vespalib::count_ms(steady_clock::now().time_since_epoch()); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp index f98a4be05c3..01d644bba09 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp +++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp @@ -26,14 +26,13 @@ Receptor::handleReply(Reply::UP reply) } Message::UP -Receptor::getMessage(double maxWait) +Receptor::getMessage(duration maxWait) { - int64_t ms = (int64_t)(maxWait * 1000); steady_clock::time_point startTime = steady_clock::now(); vespalib::MonitorGuard guard(_mon); while (_msg.get() == 0) { - int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count(); - if (w <= 0 || !guard.wait(w)) { + duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); + if (w <= duration::zero() || !guard.wait(w)) { break; } } @@ -41,14 +40,13 @@ Receptor::getMessage(double maxWait) } Reply::UP -Receptor::getReply(double maxWait) +Receptor::getReply(duration maxWait) { - int64_t ms = (int)(maxWait * 1000); steady_clock::time_point startTime = steady_clock::now(); vespalib::MonitorGuard guard(_mon); while (_reply.get() == 0) { - int64_t w = ms - duration_cast<milliseconds>(steady_clock::now() - startTime).count(); - if (w <= 0 || !guard.wait(w)) { + duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); + if (w <= duration::zero() || !guard.wait(w)) { break; } } diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h index ffc637fa90d..1d98ac62cd2 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.h +++ b/messagebus/src/vespa/messagebus/testlib/receptor.h @@ -25,8 +25,10 @@ public: ~Receptor(); void handleMessage(Message::UP msg) override; void handleReply(Reply::UP reply) override; - Message::UP getMessage(double maxWait = 120.0); - Reply::UP getReply(double maxWait = 120.0); + Message::UP getMessage(duration maxWait = 120s); + Reply::UP getReply(duration maxWait = 120s); + Message::UP getMessageNow() { return getMessage(duration::zero()); } + Reply::UP getReplyNow() { return getReply(duration::zero()); } }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp index bbd23d52c0b..fe5baab3e40 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp +++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp @@ -4,6 +4,7 @@ #include "slobrok.h" #include "slobrokstate.h" #include <vespa/vespalib/component/vtag.h> +#include <thread> namespace mbus { diff --git a/messagebus_test/src/tests/error/cpp-client.cpp b/messagebus_test/src/tests/error/cpp-client.cpp index f186be68d01..833d941da32 100644 --- a/messagebus_test/src/tests/error/cpp-client.cpp +++ b/messagebus_test/src/tests/error/cpp-client.cpp @@ -35,7 +35,7 @@ App::Main() msg.reset(new SimpleMessage("test")); msg->getTrace().setLevel(9); ss->send(std::move(msg), "test"); - reply = src.getReply(600); // 10 minutes timeout + reply = src.getReply(600s); // 10 minutes timeout if (reply.get() == 0) { fprintf(stderr, "CPP-CLIENT: no reply\n"); } else { diff --git a/messagebus_test/src/tests/trace/trace.cpp b/messagebus_test/src/tests/trace/trace.cpp index a804bef6785..0d4c622a0df 100644 --- a/messagebus_test/src/tests/trace/trace.cpp +++ b/messagebus_test/src/tests/trace/trace.cpp @@ -103,8 +103,8 @@ Test::Main() Message::UP msg(new SimpleMessage("test")); msg->getTrace().setLevel(1); ss->send(std::move(msg), "test"); - reply = src.getReply(10); - if (reply.get() != NULL) { + reply = src.getReply(10s); + if (reply) { reply->getTrace().getRoot().normalize(); // resending breaks the trace, so retry until it has expected form if (!reply->hasErrors() && reply->getTrace().getRoot().encode() == expect.encode()) { diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index d882d17841e..bd76b559490 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -47,7 +47,7 @@ public: close(); } - document::BucketId createAndSendSampleDocument(uint32_t timeout); + document::BucketId createAndSendSampleDocument(vespalib::duration timeout); std::string getNodes(const std::string& infoString); void sendReply(int idx = -1, @@ -96,7 +96,7 @@ public: PutOperationTest::~PutOperationTest() = default; document::BucketId -PutOperationTest::createAndSendSampleDocument(uint32_t timeout) { +PutOperationTest::createAndSendSampleDocument(vespalib::duration timeout) { auto doc = std::make_shared<Document>(doc_type(), DocumentId("id:test:testdoctype1::")); document::BucketId id = getExternalOperationHandler().getBucketId(doc->getId()); @@ -119,9 +119,11 @@ using RequirePrimaryWritten = bool; } +const vespalib::duration TIMEOUT = 180ms; + TEST_F(PutOperationTest, simple) { setupDistributor(1, 1, "storage:1 distributor:1"); - createAndSendSampleDocument(180); + createAndSendSampleDocument(TIMEOUT); ASSERT_EQ("Put(BucketId(0x4000000000001dd4), " "id:test:testdoctype1::, timestamp 100, size 45) => 0", @@ -182,7 +184,7 @@ TEST_F(PutOperationTest, do_not_send_inline_split_if_not_configured) { TEST_F(PutOperationTest, node_removed_on_reply) { setupDistributor(2, 2, "storage:2 distributor:1"); - createAndSendSampleDocument(180); + createAndSendSampleDocument(TIMEOUT); ASSERT_EQ("Put(BucketId(0x4000000000001dd4), " "id:test:testdoctype1::, timestamp 100, size 45) => 0," @@ -206,7 +208,7 @@ TEST_F(PutOperationTest, node_removed_on_reply) { TEST_F(PutOperationTest, storage_failed) { setupDistributor(2, 1, "storage:1 distributor:1"); - createAndSendSampleDocument(180); + createAndSendSampleDocument(TIMEOUT); sendReply(-1, api::ReturnCode::INTERNAL_FAILURE); @@ -334,7 +336,7 @@ TEST_F(PutOperationTest, do_not_revert_on_failure_after_early_return) { TEST_F(PutOperationTest, revert_successful_copies_when_one_fails) { setupDistributor(3, 4, "storage:4 distributor:1"); - createAndSendSampleDocument(180); + createAndSendSampleDocument(TIMEOUT); ASSERT_EQ("Put => 0,Put => 2,Put => 1", _sender.getCommands(true)); @@ -359,7 +361,7 @@ TEST_F(PutOperationTest, no_revert_if_revert_disabled) { SetUp(); setupDistributor(3, 4, "storage:4 distributor:1"); - createAndSendSampleDocument(180); + createAndSendSampleDocument(TIMEOUT); ASSERT_EQ("Put => 0,Put => 2,Put => 1", _sender.getCommands(true)); @@ -404,7 +406,7 @@ TEST_F(PutOperationTest, do_not_send_CreateBucket_if_already_pending) { TEST_F(PutOperationTest, no_storage_nodes) { setupDistributor(2, 1, "storage:0 distributor:1"); - createAndSendSampleDocument(180); + createAndSendSampleDocument(TIMEOUT); ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), " "timestamp 100) ReturnCode(NOT_CONNECTED, " "Can't store document: No storage nodes available)", diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 67ef3374633..788ac1960dd 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -266,7 +266,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, makeDocumentBucket(document::BucketId(0)), update, api::Timestamp(0))); // Misc settings for checking that propagation works. msg->getTrace().setLevel(6); - msg->setTimeout(6789); + msg->setTimeout(6789ms); msg->setPriority(99); if (options._timestampToUpdate) { msg->setOldTimestamp(options._timestampToUpdate); @@ -517,7 +517,7 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( { // Settings set in sendUpdate(). EXPECT_EQ(6, msg->getTrace().getLevel()); - EXPECT_EQ(6789, msg->getTimeout()); + EXPECT_EQ(6789ms, msg->getTimeout()); EXPECT_EQ(99, msg->getPriority()); } diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index 3bb86eaebd9..5d7871376cb 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -47,7 +47,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil { document::BucketId superBucket, document::BucketId lastBucket, uint32_t maxBuckets = 8, - uint32_t timeoutMS = 500, + vespalib::duration timeout = 500ms, bool visitInconsistentBuckets = false, bool visitRemoves = false, std::string libraryName = "dumpvisitor", @@ -69,7 +69,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil { cmd->setMaximumPendingReplyCount(VisitorOperationTest::MAX_PENDING); cmd->setMaxBucketsPerVisitor(maxBuckets); - cmd->setTimeout(timeoutMS); + cmd->setTimeout(timeout); if (visitInconsistentBuckets) { cmd->setVisitInconsistentBuckets(); } @@ -178,7 +178,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState) msg->addBucketToBeVisited(nullId); msg->setFieldSet("[header]"); msg->setVisitRemoves(); - msg->setTimeout(1234); + msg->setTimeout(1234ms); msg->getTrace().setLevel(7); auto op = createOpWithDefaultConfig(std::move(msg)); @@ -203,7 +203,7 @@ VisitorOperationTest::doStandardVisitTest(const std::string& clusterState) EXPECT_GT(cvc->getToTime(), 0); EXPECT_EQ("[header]", cvc->getFieldSet()); EXPECT_TRUE(cvc->visitRemoves()); - EXPECT_EQ(1234, cvc->getTimeout()); + EXPECT_EQ(1234ms, cvc->getTimeout()); EXPECT_EQ(7, cvc->getTrace().getLevel()); sendReply(*op); @@ -285,7 +285,7 @@ TEST_F(VisitorOperationTest, no_resend_after_timeout_passed) { addNodesToBucketDB(id, "0=1/1/1/t,1=1/1/1/t"); auto op = createOpWithDefaultConfig( - createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20)); + createVisitorCommand("lowtimeoutbusy", id, nullId, 8, 20ms)); op->start(_sender, framework::MilliSecTime(0)); @@ -331,7 +331,7 @@ TEST_F(VisitorOperationTest, user_single_bucket) { userid, nullId, 8, - 500, + 500ms, false, false, "dumpvisitor", @@ -356,7 +356,7 @@ VisitorOperationTest::runVisitor(document::BucketId id, id, lastId, maxBuckets, - 500, + 500ms, false, false, "dumpvisitor", @@ -448,13 +448,7 @@ TEST_F(VisitorOperationTest, empty_buckets_visited_when_visiting_removes) { addNodesToBucketDB(id, "0=0/0/0/1/2/t"); auto op = createOpWithDefaultConfig( - createVisitorCommand("emptybucket", - id, - nullId, - 8, - 500, - false, - true)); + createVisitorCommand("emptybucket", id, nullId, 8, 500ms, false, true)); op->start(_sender, framework::MilliSecTime(0)); @@ -534,7 +528,7 @@ TEST_F(VisitorOperationTest, timeout_does_not_override_critical_error) { document::BucketId(16, 1), nullId, 8, - 500)); // ms timeout + 500ms)); // ms timeout op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0,Visitor Create => 1", @@ -607,7 +601,7 @@ TEST_F(VisitorOperationTest, bucket_high_bit_count) { id, nullId, 8, - 500, + 500ms, false, false, "dumpvisitor", @@ -633,7 +627,7 @@ TEST_F(VisitorOperationTest, bucket_low_bit_count) { id, nullId, 8, - 500, + 500ms, false, false, "dumpvisitor", @@ -829,7 +823,7 @@ TEST_F(VisitorOperationTest, inconsistency_handling) { _sender.clear(); auto op = createOpWithConfig( - createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500, true), + createVisitorCommand("multiplebucketsonesuper", id, nullId, 8, 500ms, true), VisitorOperation::Config(5, 4)); op->start(_sender, framework::MilliSecTime(0)); @@ -988,7 +982,7 @@ VisitorOperationTest::startOperationWith2StorageNodeVisitors(bool inconsistent) id, nullId, 8, - 500, + 500ms, inconsistent)); op->start(_sender, framework::MilliSecTime(0)); @@ -1040,13 +1034,13 @@ TEST_F(VisitorOperationTest, queue_timeout_is_factor_of_total_timeout) { addNodesToBucketDB(id, "0=1/1/1/t,1=1/1/1/t"); auto op = createOpWithDefaultConfig( - createVisitorCommand("foo", id, nullId, 8, 10000)); + createVisitorCommand("foo", id, nullId, 8, 10000ms)); op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); auto& cmd = dynamic_cast<CreateVisitorCommand&>(*_sender.command(0)); - EXPECT_EQ(5000, cmd.getQueueTimeout()); + EXPECT_EQ(5000ms, cmd.getQueueTimeout()); } void diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 44cb92071a1..b46c0236150 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -856,7 +856,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(0); - cmd->setTimeout(50); + cmd->setTimeout(50ms); filestorHandler.schedule(cmd, 0); } @@ -865,7 +865,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(200); - cmd->setTimeout(10000); + cmd->setTimeout(10000ms); filestorHandler.schedule(cmd, 0); } diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index 16b43828120..bc52d7508dc 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -195,7 +195,7 @@ TEST_F(DocumentApiConverterTest, create_visitor) { EXPECT_EQ("myinstance", cmd->getInstanceId()); EXPECT_EQ("control-dest", cmd->getControlDestination()); EXPECT_EQ("data-dest", cmd->getDataDestination()); - EXPECT_EQ(123456u, cmd->getTimeout()); + EXPECT_EQ(123456ms, cmd->getTimeout()); auto msg = toDocumentAPI<documentapi::CreateVisitorMessage>(*cmd); EXPECT_EQ(defaultSpaceName, msg->getBucketSpace()); @@ -210,7 +210,7 @@ TEST_F(DocumentApiConverterTest, create_visitor_high_timeout) { EXPECT_EQ("myinstance", cmd->getInstanceId()); EXPECT_EQ("control-dest", cmd->getControlDestination()); EXPECT_EQ("data-dest", cmd->getDataDestination()); - EXPECT_EQ(std::numeric_limits<int32_t>::max(), cmd->getTimeout()); + EXPECT_EQ(std::numeric_limits<int32_t>::max(), vespalib::count_ms(cmd->getTimeout())); } TEST_F(DocumentApiConverterTest, create_visitor_reply_not_ready) { diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 30ad9b58e9f..178862d8393 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -267,7 +267,7 @@ TEST_F(MergeThrottlerTest, chain) { } auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123); cmd->setPriority(7); - cmd->setTimeout(54321); + cmd->setTimeout(54321ms); StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); cmd->setAddress(address); const uint16_t distributorIndex = 123; @@ -306,7 +306,7 @@ TEST_F(MergeThrottlerTest, chain) { // Ensure priority, cluster state version and timeout is correctly forwarded EXPECT_EQ(7, static_cast<int>(fwd->getPriority())); EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); - EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); } _topLinks[lastNodeIdx]->sendDown(fwd); @@ -332,7 +332,7 @@ TEST_F(MergeThrottlerTest, chain) { } EXPECT_EQ(7, static_cast<int>(fwd->getPriority())); EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); - EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); _topLinks[executorNode]->sendDown(fwd); } @@ -359,7 +359,7 @@ TEST_F(MergeThrottlerTest, chain) { fwd = _bottomLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET); EXPECT_EQ(7, static_cast<int>(fwd->getPriority())); EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); - EXPECT_EQ(54321, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); auto reply = std::make_shared<MergeBucketReply>(dynamic_cast<const MergeBucketCommand&>(*fwd)); reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<")); diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index c2074c53dd7..f88b59f50a5 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -246,7 +246,7 @@ void StateManagerTest::mark_reported_node_state_up() { void StateManagerTest::send_down_get_node_state_request(uint16_t controller_index) { auto cmd = std::make_shared<api::GetNodeStateCommand>( std::make_unique<NodeState>(NodeType::STORAGE, State::UP)); - cmd->setTimeout(10000000); + cmd->setTimeout(10000000ms); cmd->setSourceIndex(controller_index); _upper->sendDown(cmd); } @@ -320,7 +320,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat force_current_cluster_state_version(12345); auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340); - cmd->setTimeout(10000000); + cmd->setTimeout(10000000ms); cmd->setSourceIndex(0); _upper->sendDown(cmd); diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index c152e4c5191..359a242ff5d 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -16,14 +16,13 @@ namespace storage { namespace { std::shared_ptr<api::CreateVisitorCommand> getCommand( - vespalib::stringref name, int timeout, + vespalib::stringref name, vespalib::duration timeout, uint8_t priority = 0) { vespalib::asciistream ost; - ost << name << " t=" << timeout << " p=" << static_cast<unsigned int>(priority); + ost << name << " t=" << vespalib::count_ms(timeout) << " p=" << static_cast<unsigned int>(priority); // Piggyback name in document selection - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "", "", ost.str())); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "", "", ost.str()); cmd->setQueueTimeout(timeout); cmd->setPriority(priority); return cmd; @@ -43,13 +42,13 @@ TEST(CommandQueueTest, fifo) { ASSERT_TRUE(queue.empty()); // Use all default priorities, meaning what comes out should be in the same order // as what went in - queue.add(getCommand("first", 1)); - queue.add(getCommand("second", 10)); - queue.add(getCommand("third", 5)); - queue.add(getCommand("fourth", 0)); - queue.add(getCommand("fifth", 3)); - queue.add(getCommand("sixth", 14)); - queue.add(getCommand("seventh", 7)); + queue.add(getCommand("first", 1ms)); + queue.add(getCommand("second", 10ms)); + queue.add(getCommand("third", 5ms)); + queue.add(getCommand("fourth", 0ms)); + queue.add(getCommand("fifth", 3ms)); + queue.add(getCommand("sixth", 14ms)); + queue.add(getCommand("seventh", 7ms)); ASSERT_FALSE(queue.empty()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; @@ -74,16 +73,16 @@ TEST(CommandQueueTest, fifo_with_priorities) { CommandQueue<api::CreateVisitorCommand> queue(clock); ASSERT_TRUE(queue.empty()); - queue.add(getCommand("first", 1, 10)); + queue.add(getCommand("first", 1ms, 10)); EXPECT_EQ("first t=1 p=10", getCommandString(queue.peekLowestPriorityCommand())); - queue.add(getCommand("second", 10, 22)); - queue.add(getCommand("third", 5, 9)); + queue.add(getCommand("second", 10ms, 22)); + queue.add(getCommand("third", 5ms, 9)); EXPECT_EQ("second t=10 p=22", getCommandString(queue.peekLowestPriorityCommand())); - queue.add(getCommand("fourth", 0, 22)); - queue.add(getCommand("fifth", 3, 22)); + queue.add(getCommand("fourth", 0ms, 22)); + queue.add(getCommand("fifth", 3ms, 22)); EXPECT_EQ("fifth t=3 p=22", getCommandString(queue.peekLowestPriorityCommand())); - queue.add(getCommand("sixth", 14, 50)); - queue.add(getCommand("seventh", 7, 0)); + queue.add(getCommand("sixth", 14ms, 50)); + queue.add(getCommand("seventh", 7ms, 0)); EXPECT_EQ("sixth t=14 p=50", getCommandString(queue.peekLowestPriorityCommand())); @@ -111,19 +110,19 @@ TEST(CommandQueueTest, release_oldest) { framework::defaultimplementation::FakeClock clock(framework::defaultimplementation::FakeClock::FAKE_ABSOLUTE); CommandQueue<api::CreateVisitorCommand> queue(clock); ASSERT_TRUE(queue.empty()); - queue.add(getCommand("first", 10)); - queue.add(getCommand("second", 100)); - queue.add(getCommand("third", 1000)); - queue.add(getCommand("fourth", 5)); - queue.add(getCommand("fifth", 3000)); - queue.add(getCommand("sixth", 400)); - queue.add(getCommand("seventh", 700)); + queue.add(getCommand("first", 10ms)); + queue.add(getCommand("second", 100ms)); + queue.add(getCommand("third", 1000ms)); + queue.add(getCommand("fourth", 5ms)); + queue.add(getCommand("fifth", 3000ms)); + queue.add(getCommand("sixth", 400ms)); + queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); using CommandEntry = CommandQueue<api::CreateVisitorCommand>::CommandEntry; std::list<CommandEntry> timedOut(queue.releaseTimedOut()); ASSERT_TRUE(timedOut.empty()); - clock.addMilliSecondsToTime(400 * 1000); + clock.addMilliSecondsToTime(400); timedOut = queue.releaseTimedOut(); ASSERT_EQ(4, timedOut.size()); std::ostringstream ost; @@ -144,13 +143,13 @@ TEST(CommandQueueTest, release_lowest_priority) { CommandQueue<api::CreateVisitorCommand> queue(clock); ASSERT_TRUE(queue.empty()); - queue.add(getCommand("first", 1, 10)); - queue.add(getCommand("second", 10, 22)); - queue.add(getCommand("third", 5, 9)); - queue.add(getCommand("fourth", 0, 22)); - queue.add(getCommand("fifth", 3, 22)); - queue.add(getCommand("sixth", 14, 50)); - queue.add(getCommand("seventh", 7, 0)); + queue.add(getCommand("first", 1ms, 10)); + queue.add(getCommand("second", 10ms, 22)); + queue.add(getCommand("third", 5ms, 9)); + queue.add(getCommand("fourth", 0ms, 22)); + queue.add(getCommand("fifth", 3ms, 22)); + queue.add(getCommand("sixth", 14ms, 50)); + queue.add(getCommand("seventh", 7ms, 0)); ASSERT_EQ(7u, queue.size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; @@ -177,13 +176,13 @@ TEST(CommandQueueTest, delete_iterator) { framework::defaultimplementation::FakeClock clock; CommandQueue<api::CreateVisitorCommand> queue(clock); ASSERT_TRUE(queue.empty()); - queue.add(getCommand("first", 10)); - queue.add(getCommand("second", 100)); - queue.add(getCommand("third", 1000)); - queue.add(getCommand("fourth", 5)); - queue.add(getCommand("fifth", 3000)); - queue.add(getCommand("sixth", 400)); - queue.add(getCommand("seventh", 700)); + queue.add(getCommand("first", 10ms)); + queue.add(getCommand("second", 100ms)); + queue.add(getCommand("third", 1000ms)); + queue.add(getCommand("fourth", 5ms)); + queue.add(getCommand("fifth", 3000ms)); + queue.add(getCommand("sixth", 400ms)); + queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin(); diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 1275372b73b..b7eb7fee3ec 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -617,7 +617,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) { auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "InvalidVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(0); + cmd->setQueueTimeout(0ms); _top->sendDown(cmd); _top->waitForMessages(i+1, 60); } @@ -629,7 +629,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) { auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(0); + cmd->setQueueTimeout(0ms); _top->sendDown(cmd); } @@ -698,7 +698,7 @@ TEST_F(VisitorManagerTest, visitor_cleanup) { auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(0); + cmd->setQueueTimeout(0ms); _top->sendDown(cmd); } @@ -730,7 +730,7 @@ TEST_F(VisitorManagerTest, abort_on_failed_visitor_info) { auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(0); + cmd->setQueueTimeout(0ms); _top->sendDown(cmd); } @@ -765,7 +765,7 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) { makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval{bogus} == 1234"); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(0); + cmd->setQueueTimeout(0ms); _top->sendDown(cmd); ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ILLEGAL_PARAMETERS)); @@ -782,8 +782,8 @@ TEST_F(VisitorManagerTest, visitor_queue_timeout) { auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(1); - cmd->setTimeout(100 * 1000 * 1000); + cmd->setQueueTimeout(1ms); + cmd->setTimeout(100 * 1000 * 1000ms); _top->sendDown(cmd); _node->getClock().addSecondsToTime(1000); @@ -807,8 +807,8 @@ TEST_F(VisitorManagerTest, visitor_processing_timeout) { auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); - cmd->setQueueTimeout(0); - cmd->setTimeout(100); + cmd->setQueueTimeout(0ms); + cmd->setTimeout(100ms); _top->sendDown(cmd); // Wait for Put before increasing the clock @@ -825,7 +825,7 @@ namespace { uint32_t nextVisitor = 0; api::StorageMessage::Id -sendCreateVisitor(uint32_t timeout, DummyStorageLink& top, uint8_t priority = 127) { +sendCreateVisitor(vespalib::duration timeout, DummyStorageLink& top, uint8_t priority = 127) { std::ostringstream ost; ost << "testvis" << ++nextVisitor; api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); @@ -851,25 +851,25 @@ TEST_F(VisitorManagerTest, prioritized_visitor_queing) { // First 4 should just start.. for (uint32_t i = 0; i < 4; ++i) { - ids[i] = sendCreateVisitor(i, *_top, i); + ids[i] = sendCreateVisitor(i*1ms, *_top, i); } // Next ones should be queued - (Better not finish before we get here) // Submit with higher priorities for (uint32_t i = 0; i < 4; ++i) { - ids[i + 4] = sendCreateVisitor(1000, *_top, 100 - i); + ids[i + 4] = sendCreateVisitor(1000ms, *_top, 100 - i); } // Queue is now full with a pri 100 visitor at its end // Send a lower pri visitor that will be busy-returned immediately - ids[8] = sendCreateVisitor(1000, *_top, 130); + ids[8] = sendCreateVisitor(1000ms, *_top, 130); uint64_t message_id = 0; ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id)); ASSERT_EQ(ids[8], message_id); // Send a higher pri visitor that will take the place of pri 100 visitor - ids[9] = sendCreateVisitor(1000, *_top, 60); + ids[9] = sendCreateVisitor(1000ms, *_top, 60); ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id)); ASSERT_EQ(ids[4], message_id); @@ -917,44 +917,44 @@ TEST_F(VisitorManagerTest, prioritized_max_concurrent_visitors) { // First 4 should just start.. for (uint32_t i = 0; i < 4; ++i) { - ids[i] = sendCreateVisitor(i, *_top, i); + ids[i] = sendCreateVisitor(i*1ms, *_top, i); } // Low pri messages; get put into queue for (uint32_t i = 0; i < 6; ++i) { - ids[i + 4] = sendCreateVisitor(1000, *_top, 203 - i); + ids[i + 4] = sendCreateVisitor(1000ms, *_top, 203 - i); } // Higher pri message: fits happily into 1 extra concurrent slot - ids[10] = sendCreateVisitor(1000, *_top, 190); + ids[10] = sendCreateVisitor(1000ms, *_top, 190); // Should punch pri203 msg out of the queue -> busy - ids[11] = sendCreateVisitor(1000, *_top, 197); + ids[11] = sendCreateVisitor(1000ms, *_top, 197); uint64_t message_id = 0; ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id)); ASSERT_EQ(ids[4], message_id); // No concurrency slots left for this message -> busy - ids[12] = sendCreateVisitor(1000, *_top, 204); + ids[12] = sendCreateVisitor(1000ms, *_top, 204); ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id)); ASSERT_EQ(ids[12], message_id); // Gets a concurrent slot - ids[13] = sendCreateVisitor(1000, *_top, 80); + ids[13] = sendCreateVisitor(1000ms, *_top, 80); // Kicks pri 202 out of the queue -> busy - ids[14] = sendCreateVisitor(1000, *_top, 79); + ids[14] = sendCreateVisitor(1000ms, *_top, 79); ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY, -1, -1, &message_id)); ASSERT_EQ(ids[5], message_id); // Gets a concurrent slot - ids[15] = sendCreateVisitor(1000, *_top, 63); + ids[15] = sendCreateVisitor(1000ms, *_top, 63); // Very Important Visitor(tm) gets a concurrent slot - ids[16] = sendCreateVisitor(1000, *_top, 0); + ids[16] = sendCreateVisitor(1000ms, *_top, 0); std::vector<document::Document::SP> docs; std::vector<document::DocumentId> docIds; @@ -1018,11 +1018,11 @@ TEST_F(VisitorManagerTest, visitor_queing_zero_queue_size) { // First 4 should just start.. for (uint32_t i = 0; i < 4; ++i) { - sendCreateVisitor(i, *_top, i); + sendCreateVisitor(i * 1ms, *_top, i); } // Queue size is zero, all visitors will be busy-returned for (uint32_t i = 0; i < 5; ++i) { - sendCreateVisitor(1000, *_top, 100 - i); + sendCreateVisitor(1000ms, *_top, 100 - i); ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::BUSY)); } for (uint32_t session = 0; session < 4; ++session) { @@ -1037,8 +1037,8 @@ TEST_F(VisitorManagerTest, status_page) { _manager->setMaxConcurrentVisitors(1, 1); _manager->setMaxVisitorQueueSize(6); // 1 running, 1 queued - sendCreateVisitor(1000000, *_top, 1); - sendCreateVisitor(1000000, *_top, 128); + sendCreateVisitor(1000000ms, *_top, 1); + sendCreateVisitor(1000000ms, *_top, 128); { TestVisitorMessageSession& session = getSession(0); diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index 3aadce4d18e..6836c738b3a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -76,9 +76,7 @@ VisitorOperation::VisitorOperation( } } -VisitorOperation::~VisitorOperation() -{ -} +VisitorOperation::~VisitorOperation() = default; document::BucketId VisitorOperation::getLastBucketVisited() @@ -121,22 +119,21 @@ VisitorOperation::getLastBucketVisited() return newLastBucket; } -uint64_t +vespalib::duration VisitorOperation::timeLeft() const noexcept { const auto elapsed = _operationTimer.getElapsedTime(); - framework::MilliSecTime timeSpent( - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count()); + LOG(spam, - "Checking if visitor has timed out: elapsed=%" PRIu64 " ms, timeout=%u ms", - timeSpent.getTime(), - _msg->getTimeout()); + "Checking if visitor has timed out: elapsed=%ld ms, timeout=%ld ms", + vespalib::count_ms(elapsed), + vespalib::count_ms(_msg->getTimeout())); - if (timeSpent.getTime() >= _msg->getTimeout()) { - return 0; + if (elapsed >= _msg->getTimeout()) { + return vespalib::duration::zero(); } else { - return _msg->getTimeout() - timeSpent.getTime(); + return _msg->getTimeout() - elapsed; } } @@ -581,7 +578,7 @@ VisitorOperation::onStart(DistributorMessageSender& sender) bool VisitorOperation::shouldAbortDueToTimeout() const noexcept { - return timeLeft() == 0; + return timeLeft() <= vespalib::duration::zero(); } void @@ -629,8 +626,8 @@ VisitorOperation::startNewVisitors(DistributorMessageSender& sender) markOperationAsFailed( api::ReturnCode(api::ReturnCode::ABORTED, vespalib::make_string( - "Timeout of %u ms is running out", - _msg->getTimeout()))); + "Timeout of %ld ms is running out", + vespalib::count_ms(_msg->getTimeout())))); } if (maySendNewStorageVisitors()) { @@ -782,7 +779,7 @@ VisitorOperation::sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap, return visitorsSent; } -uint32_t +vespalib::duration VisitorOperation::computeVisitorQueueTimeoutMs() const noexcept { return timeLeft() / 2; diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index ebb5ed4c6aa..fdfe60731f5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -108,17 +108,15 @@ private: bool allowInconsistencies() const noexcept; bool shouldAbortDueToTimeout() const noexcept; bool assignBucketsToNodes(NodeToBucketsMap& nodeToBucketsMap); - int getNumVisitorsToSendForNode(uint16_t node, - uint32_t totalBucketsOnNode) const; - uint32_t computeVisitorQueueTimeoutMs() const noexcept; + int getNumVisitorsToSendForNode(uint16_t node, uint32_t totalBucketsOnNode) const; + vespalib::duration computeVisitorQueueTimeoutMs() const noexcept; bool sendStorageVisitors(const NodeToBucketsMap& nodeToBucketsMap, DistributorMessageSender& sender); void sendStorageVisitor(uint16_t node, const std::vector<document::BucketId>& buckets, uint32_t pending, DistributorMessageSender& sender); - void markCompleted(const document::BucketId& bid, - const api::ReturnCode& code); + void markCompleted(const document::BucketId& bid, const api::ReturnCode& code); /** * Operation failed and we can pin the blame on a specific node. Updates * internal error code and augments error message with the index of the @@ -138,7 +136,7 @@ private: * time point. In case of the current time having passed the timeout * point, function returns 0. */ - uint64_t timeLeft() const noexcept; + vespalib::duration timeLeft() const noexcept; DistributorComponent& _owner; DistributorBucketSpace &_bucketSpace; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 52a4a5c195c..130e039a43e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -77,7 +77,7 @@ JoinOperation::enqueueJoinMessagePerTargetNode( std::shared_ptr<api::JoinBucketsCommand> msg( new api::JoinBucketsCommand(getBucket())); msg->getSourceBuckets() = node.second; - msg->setTimeout(INT_MAX); + msg->setTimeout(vespalib::duration::max()); setCommandMeta(*msg); _tracker.queueCommand(msg, node.first); } @@ -90,8 +90,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP& api::JoinBucketsReply& rep = static_cast<api::JoinBucketsReply&>(*msg); uint16_t node = _tracker.handleReply(rep); if (node == 0xffff) { - LOG(debug, "Ignored reply since node was max uint16_t for unknown " - "reasons"); + LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons"); return; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 66ce4fc0485..445d0972937 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -155,7 +155,7 @@ MergeOperation::onStart(DistributorMessageSender& sender) // Set timeout to one hour to prevent hung nodes that manage to keep // connections open from stalling merges in the cluster indefinitely. - msg->setTimeout(60 * 60 * 1000); + msg->setTimeout(3600s); setCommandMeta(*msg); sender.sendToNode(lib::NodeType::STORAGE, _mnodes[0].index, msg); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 1b40f744a80..57f8bc92316 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -20,7 +20,7 @@ SplitOperation::SplitOperation(const std::string& clusterName, const BucketAndNo _splitCount(splitCount), _splitSize(splitSize) {} -SplitOperation::~SplitOperation() {} +SplitOperation::~SplitOperation() = default; void SplitOperation::onStart(DistributorMessageSender& sender) @@ -35,7 +35,7 @@ SplitOperation::onStart(DistributorMessageSender& sender) msg->setMaxSplitBits(_maxBits); msg->setMinDocCount(_splitCount); msg->setMinByteSize(_splitSize); - msg->setTimeout(INT_MAX); + msg->setTimeout(vespalib::duration::max()); setCommandMeta(*msg); _tracker.queueCommand(msg, entry->getNodeRef(i).getNode()); _ok = true; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 8298b126690..62a520abc87 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -230,7 +230,7 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) distributionHash)); cmd->setPriority(api::StorageMessage::HIGH); - cmd->setTimeout(INT_MAX); + cmd->setTimeout(vespalib::duration::max()); _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index d1631c50880..f773ee774bb 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -355,7 +355,7 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const } bool -FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) +FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, vespalib::duration waitTime) { if (msg.getType().isReply()) { return false; // Replies must always be processed and cannot time out. @@ -980,7 +980,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck) return lck; } - uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])); + std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]))); if (!messageTimedOutInQueue(m, waitTime)) { std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command); @@ -1004,7 +1004,7 @@ FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) { api::StorageMessage & m(*iter->_command); - uint64_t waitTime(iter->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])); + std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]))); std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); document::Bucket bucket(iter->_bucket); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index fd6ab5e8b9a..5fc592e11cb 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -325,7 +325,7 @@ private: * Return whether msg has timed out based on waitTime and the message's * specified timeout. */ - static bool messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime); + static bool messageTimedOutInQueue(const api::StorageMessage& msg, vespalib::duration waitTime); /** * Creates and returns a reply with api::TIMEOUT return code for msg. diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index 24f4c9cd731..1efb42a7b22 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -16,7 +16,7 @@ MergeStatus::MergeStatus(framework::Clock& clock, const metrics::LoadType& lt, context(lt, priority, traceLevel) {} -MergeStatus::~MergeStatus() {} +MergeStatus::~MergeStatus() = default; bool MergeStatus::removeFromDiff( diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index 932859cd2d0..082ae053ec0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -24,7 +24,7 @@ public: api::StorageMessage::Id pendingId; std::shared_ptr<api::GetBucketDiffReply> pendingGetDiff; std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff; - uint32_t timeout; + vespalib::duration timeout; framework::MilliSecTimer startTime; spi::Context context; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 499d7ce15ac..978d434847e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -148,7 +148,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) //TODO: Can it be moved ? std::shared_ptr<api::StorageCommand> cmd = storMsgPtr->getCommand(); - cmd->setTimeout(storMsgPtr->getTimeRemaining().count()); + cmd->setTimeout(storMsgPtr->getTimeRemaining()); cmd->setTrace(storMsgPtr->getTrace()); cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr))); @@ -567,7 +567,7 @@ CommunicationManager::sendCommand( cmd->setContext(mbus::Context(msg->getMsgId())); cmd->setRetryEnabled(address.retryEnabled()); - cmd->setTimeRemaining(std::chrono::milliseconds(msg->getTimeout())); + cmd->setTimeRemaining(msg->getTimeout()); cmd->setTrace(msg->getTrace()); sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); break; diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index c6a16de3282..f0c987ee333 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -20,8 +20,6 @@ LOG_SETUP(".documentapiconverter"); using document::BucketSpace; -using std::chrono::milliseconds; - namespace storage { DocumentApiConverter::DocumentApiConverter(const config::ConfigUri &configUri, @@ -140,9 +138,12 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) break; } - if (toMsg.get() != 0) { - milliseconds timeout = std::min(milliseconds(INT_MAX), fromMsg.getTimeRemaining()); - toMsg->setTimeout(timeout.count()); + if (toMsg) { + //TODO getTimeRemainingNow ? + vespalib::duration cappedTimeout = (fromMsg.getTimeRemaining() < 1ms*INT_MAX) + ? fromMsg.getTimeRemaining() + : 1ms*INT_MAX; + toMsg->setTimeout(cappedTimeout); toMsg->setPriority(_priConverter->toStoragePriority(fromMsg.getPriority())); toMsg->setLoadType(fromMsg.getLoadType()); @@ -308,8 +309,8 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg) break; } - if (toMsg.get()) { - toMsg->setTimeRemaining(milliseconds(fromMsg.getTimeout())); + if (toMsg) { + toMsg->setTimeRemaining(fromMsg.getTimeout()); toMsg->setContext(mbus::Context(fromMsg.getMsgId())); if (LOG_WOULD_LOG(spam)) { toMsg->getTrace().setLevel(9); diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index c86e1671033..45bd9c64fac 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -152,7 +152,7 @@ FNetListener::RPC_getNodeState2(FRT_RPCRequest *req) : std::unique_ptr<lib::NodeState>())); cmd->setPriority(api::StorageMessage::VERYHIGH); - cmd->setTimeout(req->GetParams()->GetValue(1)._intval32); + cmd->setTimeout(std::chrono::milliseconds(req->GetParams()->GetValue(1)._intval32)); if (req->GetParams()->GetNumValues() > 2) { cmd->setSourceIndex(req->GetParams()->GetValue(2)._intval32); } diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index af01a880fea..9afc8b2d3a5 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -460,14 +460,15 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) && (*cmd->getExpectedState() == *_nodeState || sentReply) && is_up_to_date) { + int64_t msTimeout = vespalib::count_ms(cmd->getTimeout()); LOG(debug, "Received get node state request with timeout of " - "%u milliseconds. Scheduling to be answered in " - "%u milliseconds unless a node state change " + "%ld milliseconds. Scheduling to be answered in " + "%ld milliseconds unless a node state change " "happens before that time.", - cmd->getTimeout(), cmd->getTimeout() * 800 / 1000); + msTimeout, msTimeout * 800 / 1000); TimeStatePair pair( _component.getClock().getTimeInMillis() - + framework::MilliSecTime(cmd->getTimeout() * 800 / 1000), + + framework::MilliSecTime(msTimeout * 800 / 1000), cmd); _queuedStateRequests.emplace_back(std::move(pair)); } else { diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index d129506eb64..ce309d55803 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -16,6 +16,7 @@ #include <boost/multi_index/ordered_index.hpp> #include <boost/multi_index/sequenced_index.hpp> #include <vespa/vespalib/util/printable.h> +#include <vespa/vespalib//util/time.h> #include <vespa/fastos/timestamp.h> #include <vespa/storageframework/generic/clock/clock.h> #include <list> @@ -141,11 +142,10 @@ CommandQueue<Command>::peekNextCommand() const template<class Command> void -CommandQueue<Command>::add( - const std::shared_ptr<Command>& cmd) +CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd) { framework::MicroSecTime time(_clock.getTimeInMicros() - + framework::MicroSecTime(cmd->getQueueTimeout() * 1000000)); + + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout()))); _commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority())); } diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 6330b580eb9..d4a176fa14b 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -309,7 +309,7 @@ VisitorManager::scheduleVisitor( if (_enforceQueueUse || totCount >= maximumConcurrent(*cmd)) { api::CreateVisitorCommand::SP failCommand; - if (cmd->getQueueTimeout() != 0 && _maxVisitorQueueSize > 0) { + if (cmd->getQueueTimeout() != vespalib::duration::zero() && _maxVisitorQueueSize > 0) { if (_visitorQueue.size() < _maxVisitorQueueSize) { // Still room in the queue _visitorQueue.add(cmd); @@ -348,7 +348,7 @@ VisitorManager::scheduleVisitor( std::shared_ptr<api::CreateVisitorReply> reply( new api::CreateVisitorReply(*failCommand)); std::ostringstream ost; - if (cmd->getQueueTimeout() == 0) { + if (cmd->getQueueTimeout() == vespalib::duration::zero()) { ost << "Already running the maximum amount (" << maximumConcurrent(*failCommand) << ") of visitors for this priority (" @@ -632,7 +632,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, it->_command); assert(cmd.get()); out << "<li>" << cmd->getInstanceId() << " - " - << cmd->getQueueTimeout() << ", remaining timeout " + << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout " << (it->_time - time.getTime()) / 1000000 << " ms\n"; } if (_visitorQueue.empty()) { @@ -657,7 +657,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, << "<td>" << it->first << "</td>" << "<td>" << it->second.id << "</td>" << "<td>" << it->second.timestamp << "</td>" - << "<td>" << it->second.timeout << "</td>" + << "<td>" << vespalib::count_ms(it->second.timeout) << "</td>" << "<td>" << it->second.destination << "</td>" << "</tr>\n"; } diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 3a3e743eaf2..3675a824e1d 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -55,7 +55,7 @@ private: struct MessageInfo { api::VisitorId id; time_t timestamp; - uint64_t timeout; + vespalib::duration timeout; std::string destination; }; diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index 142e7a89144..006af5edf7d 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -546,7 +546,7 @@ VisitorThread::onCreateVisitor( std::move(messageSession), documentPriority); visitor->attach(cmd, *controlAddress, *dataAddress, - framework::MilliSecTime(cmd->getTimeout())); + framework::MilliSecTime(vespalib::count_ms(cmd->getTimeout()))); } catch (std::exception& e) { // We don't handle exceptions from this code, as we've // added visitor to internal structs we'll end up calling diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index b1a754bbbab..dbd79e4fcca 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -522,7 +522,7 @@ TEST_P(StorageProtocolTest, create_visitor) { cmd->getBuckets() = buckets; cmd->setFieldSet("foo,bar,vekterli"); cmd->setVisitInconsistentBuckets(); - cmd->setQueueTimeout(100); + cmd->setQueueTimeout(100ms); cmd->setPriority(149); auto cmd2 = copyCommand(cmd); EXPECT_EQ("library", cmd2->getLibraryName()); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 466ff85f398..b90153c9517 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -44,7 +44,7 @@ ProtocolSerialization4_2::onDecodeGetCommand(BBuf& buf) const api::GetCommand::UP msg( new api::GetCommand(bucket, did, headerOnly ? "[header]" : "[all]", beforeTimestamp)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -64,7 +64,7 @@ ProtocolSerialization4_2::onDecodeRemoveCommand(BBuf& buf) const api::Timestamp timestamp(SH::getLong(buf)); api::RemoveCommand::UP msg(new api::RemoveCommand(bucket, did, timestamp)); onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -88,7 +88,7 @@ ProtocolSerialization4_2::onDecodeRevertCommand(BBuf& buf) const } api::RevertCommand::UP msg(new api::RevertCommand(bucket, tokens)); onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -104,7 +104,7 @@ ProtocolSerialization4_2::onDecodeCreateBucketCommand(BBuf& buf) const document::Bucket bucket = getBucket(buf); api::CreateBucketCommand::UP msg(new api::CreateBucketCommand(bucket)); onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -138,7 +138,7 @@ ProtocolSerialization4_2::onDecodeMergeBucketCommand(BBuf& buf) const api::MergeBucketCommand::UP msg( new api::MergeBucketCommand(bucket, nodes, timestamp)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -187,7 +187,7 @@ ProtocolSerialization4_2::onDecodeGetBucketDiffCommand(BBuf& buf) const onDecodeDiffEntry(buf, entries[i]); } onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -259,7 +259,7 @@ ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const entries[i]._bodyBlob.size()); } onDecodeBucketInfoCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -291,7 +291,7 @@ ProtocolSerialization4_2::onDecodeRequestBucketInfoReply(const SCmd& cmd, entry._info = getBucketInfo(buf); } onDecodeReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -326,7 +326,7 @@ ProtocolSerialization4_2::onDecodeNotifyBucketChangeReply(const SCmd& cmd, api::NotifyBucketChangeReply::UP msg(new api::NotifyBucketChangeReply( static_cast<const api::NotifyBucketChangeCommand&>(cmd))); onDecodeReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -350,7 +350,7 @@ ProtocolSerialization4_2::onDecodeSplitBucketCommand(BBuf& buf) const msg->setMinByteSize(SH::getInt(buf)); msg->setMinDocCount(SH::getInt(buf)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void ProtocolSerialization4_2::onEncode( @@ -403,7 +403,7 @@ ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateVisitorCommand& buf.putBoolean(msg.visitRemoves()); buf.putBoolean(msg.getFieldSet() == "[header]"); buf.putBoolean(msg.visitInconsistentBuckets()); - buf.putInt(msg.getQueueTimeout()); + buf.putInt(vespalib::count_ms(msg.getQueueTimeout())); uint32_t size = msg.getParameters().getSerializedSize(); char* docBuffer = buf.allocate(size); @@ -449,12 +449,12 @@ ProtocolSerialization4_2::onDecodeCreateVisitorCommand(BBuf& buf) const if (SH::getBoolean(buf)) { msg->setVisitInconsistentBuckets(); } - msg->setQueueTimeout(SH::getInt(buf)); + msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf))); msg->getParameters().deserialize(getTypeRepo(), buf); onDecodeCommand(buf, *msg); msg->setVisitorDispatcherVersion(42); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -471,7 +471,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorCommand(BBuf& buf) const vespalib::stringref instanceId = SH::getString(buf); api::DestroyVisitorCommand::UP msg(new api::DestroyVisitorCommand(instanceId)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -485,7 +485,7 @@ ProtocolSerialization4_2::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf { api::DestroyVisitorReply::UP msg(new api::DestroyVisitorReply(static_cast<const api::DestroyVisitorCommand&>(cmd))); onDecodeReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } void @@ -505,7 +505,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationCommand(BBuf& buf) const api::RemoveLocationCommand::UP msg; msg.reset(new api::RemoveLocationCommand(documentSelection, bucket)); onDecodeCommand(buf, *msg); - return api::StorageCommand::UP(msg.release()); + return msg; } void @@ -519,7 +519,7 @@ ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf { api::RemoveLocationReply::UP msg(new api::RemoveLocationReply(static_cast<const api::RemoveLocationCommand&>(cmd))); onDecodeBucketInfoReply(buf, *msg); - return api::StorageReply::UP(msg.release()); + return msg; } // Utility functions for serialization diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp index 32680b24683..b0a1685ed8c 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp @@ -132,7 +132,7 @@ ProtocolSerialization5_1::onEncode( buf.putBoolean(msg.visitRemoves()); buf.putString(msg.getFieldSet()); buf.putBoolean(msg.visitInconsistentBuckets()); - buf.putInt(msg.getQueueTimeout()); + buf.putInt(vespalib::count_ms(msg.getQueueTimeout())); uint32_t size = msg.getParameters().getSerializedSize(); char* docBuffer = buf.allocate(size); @@ -181,7 +181,7 @@ ProtocolSerialization5_1::onDecodeCreateVisitorCommand(BBuf& buf) const if (SH::getBoolean(buf)) { msg->setVisitInconsistentBuckets(); } - msg->setQueueTimeout(SH::getInt(buf)); + msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf))); msg->getParameters().deserialize(getTypeRepo(), buf); onDecodeCommand(buf, *msg); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index d0446f52893..bf56dd56db6 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -1162,7 +1162,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorComman ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId()); ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size()); ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size()); - ctrl_meta->set_queue_timeout(msg.getQueueTimeout()); + ctrl_meta->set_queue_timeout(vespalib::count_ms(msg.getQueueTimeout())); ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount()); ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor()); @@ -1211,7 +1211,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBu cmd->setControlDestination(ctrl_meta.control_destination()); cmd->setDataDestination(ctrl_meta.data_destination()); cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); - cmd->setQueueTimeout(ctrl_meta.queue_timeout()); + cmd->setQueueTimeout(std::chrono::milliseconds(ctrl_meta.queue_timeout())); cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl diff --git a/storageapi/src/vespa/storageapi/message/visitor.cpp b/storageapi/src/vespa/storageapi/message/visitor.cpp index dbda2d0d0c2..aeb58f30fb4 100644 --- a/storageapi/src/vespa/storageapi/message/visitor.cpp +++ b/storageapi/src/vespa/storageapi/message/visitor.cpp @@ -34,7 +34,7 @@ CreateVisitorCommand::CreateVisitorCommand(document::BucketSpace bucketSpace, _visitRemoves(false), _fieldSet("[all]"), _visitInconsistentBuckets(false), - _queueTimeout(2000), + _queueTimeout(2000ms), _maxPendingReplyCount(2), _version(50), _maxBucketsPerVisitor(1) @@ -82,15 +82,12 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose, out << ") {"; out << "\n" << indent << " Library name: '" << _libName << "'"; out << "\n" << indent << " Instance Id: '" << _instanceId << "'"; - out << "\n" << indent << " Control Destination: '" - << _controlDestination << "'"; - out << "\n" << indent << " Data Destination: '" - << _dataDestination << "'"; + out << "\n" << indent << " Control Destination: '" << _controlDestination << "'"; + out << "\n" << indent << " Data Destination: '" << _dataDestination << "'"; out << "\n" << indent << " Doc Selection: '" << _docSelection << "'"; - out << "\n" << indent << " Max pending: '" - << _maxPendingReplyCount << "'"; - out << "\n" << indent << " Timeout: " << getTimeout(); - out << "\n" << indent << " Queue timeout: " << _queueTimeout << " ms"; + out << "\n" << indent << " Max pending: '" << _maxPendingReplyCount << "'"; + out << "\n" << indent << " Timeout: " << vespalib::count_ms(getTimeout()) << " ms"; + out << "\n" << indent << " Queue timeout: " << vespalib::count_ms(_queueTimeout) << " ms"; out << "\n" << indent << " VisitorDispatcher version: '" << _version << "'"; if (visitRemoves()) { out << "\n" << indent << " Visiting remove entries too"; @@ -109,8 +106,7 @@ CreateVisitorCommand::print(std::ostream& out, bool verbose, } out << "\n" << indent << " "; _params.print(out, verbose, indent + " "); - out << "\n" << indent << " Max buckets: '" - << _maxBucketsPerVisitor << "'"; + out << "\n" << indent << " Max buckets: '" << _maxBucketsPerVisitor << "'"; out << "\n" << indent << "} : "; StorageCommand::print(out, verbose, indent); } else if (_buckets.size() == 2) { diff --git a/storageapi/src/vespa/storageapi/message/visitor.h b/storageapi/src/vespa/storageapi/message/visitor.h index f7dcaa63b20..7189cc67195 100644 --- a/storageapi/src/vespa/storageapi/message/visitor.h +++ b/storageapi/src/vespa/storageapi/message/visitor.h @@ -44,7 +44,7 @@ private: vespalib::string _fieldSet; bool _visitInconsistentBuckets; - uint32_t _queueTimeout; + duration _queueTimeout; uint32_t _maxPendingReplyCount; uint32_t _version; @@ -61,22 +61,17 @@ public: ~CreateVisitorCommand(); void setVisitorCmdId(uint32_t id) { _visitorCmdId = id; } - void setControlDestination(vespalib::stringref d) - { _controlDestination = d; } + void setControlDestination(vespalib::stringref d) { _controlDestination = d; } void setDataDestination(vespalib::stringref d) { _dataDestination = d; } void setParameters(const vdslib::Parameters& params) { _params = params; } - void setMaximumPendingReplyCount(uint32_t count) - { _maxPendingReplyCount = count; } - void setFieldSet(vespalib::stringref fieldSet) - { _fieldSet = fieldSet; } + void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; } + void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; } void setVisitRemoves(bool value = true) { _visitRemoves = value; } - void setVisitInconsistentBuckets(bool visitInconsistent = true) - { _visitInconsistentBuckets = visitInconsistent; } - void addBucketToBeVisited(const document::BucketId& id) - { _buckets.push_back(id); } + void setVisitInconsistentBuckets(bool visitInconsistent = true) { _visitInconsistentBuckets = visitInconsistent; } + void addBucketToBeVisited(const document::BucketId& id) { _buckets.push_back(id); } void setVisitorId(const VisitorId id) { _visitorId = id; } void setInstanceId(vespalib::stringref id) { _instanceId = id; } - void setQueueTimeout(uint32_t milliSecs) { _queueTimeout = milliSecs; } + void setQueueTimeout(duration milliSecs) { _queueTimeout = milliSecs; } void setFromTime(Timestamp ts) { _fromTime = ts; } void setToTime(Timestamp ts) { _toTime = ts; } @@ -86,24 +81,20 @@ public: document::Bucket getBucket() const override; const vespalib::string & getLibraryName() const { return _libName; } const vespalib::string & getInstanceId() const { return _instanceId; } - const vespalib::string & getControlDestination() const - { return _controlDestination; } + const vespalib::string & getControlDestination() const { return _controlDestination; } const vespalib::string & getDataDestination() const { return _dataDestination; } const vespalib::string & getDocumentSelection() const { return _docSelection; } const vdslib::Parameters& getParameters() const { return _params; } vdslib::Parameters& getParameters() { return _params; } - uint32_t getMaximumPendingReplyCount() const - { return _maxPendingReplyCount; } - const std::vector<document::BucketId>& getBuckets() const - { return _buckets; } + uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; } + const std::vector<document::BucketId>& getBuckets() const { return _buckets; } Timestamp getFromTime() const { return _fromTime; } Timestamp getToTime() const { return _toTime; } std::vector<document::BucketId>& getBuckets() { return _buckets; } bool visitRemoves() const { return _visitRemoves; } const vespalib::string& getFieldSet() const { return _fieldSet; } bool visitInconsistentBuckets() const { return _visitInconsistentBuckets; } - // In millisec - uint32_t getQueueTimeout() const { return _queueTimeout; } + duration getQueueTimeout() const { return _queueTimeout; } void setVisitorDispatcherVersion(uint32_t version) { _version = version; } uint32_t getVisitorDispatcherVersion() const { return _version; } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp index fe33066872a..d9bbf34141a 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storagecommand.h" -#include <limits> #include <vespa/vespalib/util/exceptions.h> #include <ostream> @@ -20,13 +19,13 @@ StorageCommand::StorageCommand(const MessageType& type, Priority p) // Default timeout is unlimited. Set from mbus message. Some internal // use want unlimited timeout, (such as readbucketinfo, repair bucket // etc) - _timeout(std::numeric_limits<uint32_t>().max()), + _timeout(duration::max()), _sourceIndex(0xFFFF) { setPriority(p); } -StorageCommand::~StorageCommand() { } +StorageCommand::~StorageCommand() = default; void StorageCommand::print(std::ostream& out, bool verbose, @@ -36,7 +35,7 @@ StorageCommand::print(std::ostream& out, bool verbose, out << "StorageCommand(" << _type.getName(); if (_priority != NORMAL) out << ", priority = " << static_cast<int>(_priority); if (_sourceIndex != 0xFFFF) out << ", source = " << _sourceIndex; - out << ", timeout = " << _timeout << " ms"; + out << ", timeout = " << vespalib::count_ms(_timeout) << " ms"; out << ")"; } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h index 2885dac3b91..c835168c5b7 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h @@ -19,7 +19,7 @@ namespace storage::api { class StorageReply; class StorageCommand : public StorageMessage { - uint32_t _timeout; /** Timeout of command in milliseconds */ + duration _timeout; /** Timeout of command in milliseconds */ /** Sets what node this message origins from. 0xFFFF is unset. */ uint16_t _sourceIndex; @@ -37,9 +37,9 @@ public: uint16_t getSourceIndex() const { return _sourceIndex; } /** Set timeout in milliseconds. */ - void setTimeout(uint32_t milliseconds) { _timeout = milliseconds; } + void setTimeout(duration milliseconds) { _timeout = milliseconds; } /** Get timeout in milliseconds. */ - uint32_t getTimeout() const { return _timeout; } + duration getTimeout() const { return _timeout; } /** Used to set a new id so the message can be resent. */ void setNewId() { StorageMessage::setNewMsgId(); } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index 8c2338a020c..e119884bd1f 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -70,6 +70,8 @@ public: \ namespace storage::api { +using duration = vespalib::duration; + /** * @class MessageType * @ingroup messageapi diff --git a/vespalib/src/vespa/vespalib/util/sync.h b/vespalib/src/vespa/vespalib/util/sync.h index 12961dffef7..8458bc19629 100644 --- a/vespalib/src/vespa/vespalib/util/sync.h +++ b/vespalib/src/vespa/vespalib/util/sync.h @@ -378,7 +378,7 @@ public: bool wait(int msTimeout) { return wait(std::chrono::milliseconds(msTimeout)); } - bool wait(std::chrono::milliseconds timeout) { + bool wait(std::chrono::nanoseconds timeout) { return _cond->wait_for(_guard, timeout) == std::cv_status::no_timeout; } /** |