diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-05-31 20:35:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-31 20:35:39 +0200 |
commit | b17c853d5f593585eb50a794a3824951b4032278 (patch) | |
tree | bb0d30877805eb41e425ad3202091ea96624bd48 /storage | |
parent | e601f573e2f2351525343a5cf3524b4bf67baa8a (diff) | |
parent | 1ecc3ec1e66b68971845615d5bdaa4a97f3cb88b (diff) |
Merge pull request #6035 from vespa-engine/vekterli/ensure-visitormanager-tests-clean-up-after-themselves
Ensure visitormanager tests clean up after themselves
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/visiting/visitormanagertest.cpp | 261 |
1 files changed, 113 insertions, 148 deletions
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 8b17e851868..9eb1e6d2311 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -19,9 +19,13 @@ #include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> #include <vespa/documentapi/messagebus/messages/visitor.h> #include <vespa/config/common/exceptions.h> +#include <optional> +#include <thread> +#include <chrono> using document::test::makeDocumentBucket; using document::test::makeBucketSpace; +using documentapi::Priority; namespace storage { namespace { @@ -78,8 +82,9 @@ public: std::vector<document::Document::SP >& docs, std::vector<document::DocumentId>& docIds, api::ReturnCode::Result returnCode = api::ReturnCode::OK, - documentapi::Priority::Value priority = documentapi::Priority::PRI_NORMAL_4); + std::optional<Priority::Value> priority = documentapi::Priority::PRI_NORMAL_4); uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs); + void finishAndWaitForVisitorSessionCompletion(uint32_t sessionIndex); void testNormalUsage(); void testResending(); @@ -185,8 +190,7 @@ VisitorManagerTest::initializeTest() for (uint32_t i=0; i<10; ++i) { document::BucketId bid(16, i); - std::shared_ptr<api::CreateBucketCommand> cmd( - new api::CreateBucketCommand(makeDocumentBucket(bid))); + auto cmd = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bid)); cmd->setAddress(address); cmd->setSourceIndex(0); _top->sendDown(cmd); @@ -202,8 +206,7 @@ VisitorManagerTest::initializeTest() for (uint32_t i=0; i<docCount; ++i) { document::BucketId bid(16, i); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bid), _documents[i], i+1)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bid), _documents[i], i+1); cmd->setAddress(address); _top->sendDown(cmd); _top->waitForMessages(1, 60); @@ -226,45 +229,40 @@ VisitorManagerTest::addSomeRemoves(bool removeAll) for (uint32_t i=0; i<docCount; i += (removeAll ? 1 : 4)) { // Add it to the database document::BucketId bid(16, i % 10); - std::shared_ptr<api::RemoveCommand> cmd( - new api::RemoveCommand( - makeDocumentBucket(bid), _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1)); + auto cmd = std::make_shared<api::RemoveCommand>( + makeDocumentBucket(bid), _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1); cmd->setAddress(address); _top->sendDown(cmd); _top->waitForMessages(1, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); - CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); - std::shared_ptr<api::RemoveReply> reply( - std::dynamic_pointer_cast<api::RemoveReply>( - replies[0])); + CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size()); + auto reply = std::dynamic_pointer_cast<api::RemoveReply>(replies[0]); CPPUNIT_ASSERT(reply.get()); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), - reply->getResult()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), reply->getResult()); } } void VisitorManagerTest::tearDown() { - if (_top.get() != 0) { + if (_top) { + assert(_top->getNumReplies() == 0); _top->close(); _top->flush(); - _top.reset(0); + _top.reset(); } - _node.reset(0); - _messageSessionFactory.reset(0); - _manager = 0; + _node.reset(); + _messageSessionFactory.reset(); + _manager = nullptr; } TestVisitorMessageSession& VisitorManagerTest::getSession(uint32_t n) { // Wait until we have started the visitor - const std::vector<TestVisitorMessageSession*>& sessions( - _messageSessionFactory->_visitorSessions); + const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime( - clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); while (true) { { vespalib::LockGuard lock(_messageSessionFactory->_accessLock); @@ -276,7 +274,7 @@ VisitorManagerTest::getSession(uint32_t n) throw vespalib::IllegalStateException( "Timed out waiting for visitor session", VESPA_STRLOC); } - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } throw std::logic_error("unreachable"); } @@ -288,7 +286,7 @@ VisitorManagerTest::getMessagesAndReply( std::vector<document::Document::SP >& docs, std::vector<document::DocumentId>& docIds, api::ReturnCode::Result result, - documentapi::Priority::Value priority) + std::optional<Priority::Value> priority) { for (int i = 0; i < expectedCount; i++) { session.waitForMessages(i + 1); @@ -296,8 +294,10 @@ VisitorManagerTest::getMessagesAndReply( { vespalib::MonitorGuard guard(session.getMonitor()); - CPPUNIT_ASSERT_EQUAL(priority, - session.sentMessages[i]->getPriority()); + if (priority) { + CPPUNIT_ASSERT_EQUAL(*priority, + session.sentMessages[i]->getPriority()); + } switch (session.sentMessages[i]->getType()) { case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: @@ -340,8 +340,7 @@ VisitorManagerTest::verifyCreateVisitorReply( CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); CPPUNIT_ASSERT(reply.get()); CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult()); @@ -410,8 +409,7 @@ VisitorManagerTest::testNormalUsage() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setControlDestination("foo/bar"); @@ -436,8 +434,7 @@ VisitorManagerTest::testResending() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setControlDestination("foo/bar"); @@ -486,8 +483,7 @@ VisitorManagerTest::testVisitEmptyBucket() initializeTest(); addSomeRemoves(true); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); @@ -502,8 +498,7 @@ VisitorManagerTest::testMultiBucketVisit() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); for (uint32_t i=0; i<10; ++i) { cmd->addBucketToBeVisited(document::BucketId(16, i)); } @@ -527,8 +522,7 @@ VisitorManagerTest::testNoBuckets() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->setAddress(address); _top->sendDown(cmd); @@ -536,15 +530,12 @@ VisitorManagerTest::testNoBuckets() // Should get one reply; a CreateVisitorReply with error since no // buckets where specified in the CreateVisitorCommand _top->waitForMessages(1, 60); - const msg_ptr_vector replies = _top->getRepliesOnce(); + const msg_ptr_vector replies = _top->getRepliesOnce(); CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>( - replies[0])); + auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(replies[0]); // Verify that cast went ok => it was a CreateVisitorReply message CPPUNIT_ASSERT(reply.get()); - api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS, - "No buckets specified"); + api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS, "No buckets specified"); CPPUNIT_ASSERT_EQUAL(ret, reply->getResult()); } @@ -553,8 +544,7 @@ void VisitorManagerTest::testVisitPutsAndRemoves() initializeTest(); addSomeRemoves(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->setAddress(address); cmd->setVisitRemoves(); for (uint32_t i=0; i<10; ++i) { @@ -581,9 +571,7 @@ void VisitorManagerTest::testVisitWithTimeframeAndSelection() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", - "testdoctype1.headerval < 2")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval < 2"); cmd->setFromTime(3); cmd->setToTime(8); for (uint32_t i=0; i<10; ++i) { @@ -613,9 +601,8 @@ void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", - "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", + "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2"); cmd->setFromTime(3); cmd->setToTime(8); for (uint32_t i=0; i<10; ++i) { @@ -628,11 +615,9 @@ void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection() const msg_ptr_vector replies = _top->getRepliesOnce(); CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); - api::StorageReply* reply = dynamic_cast<api::StorageReply*>( - replies.front().get()); + auto* reply = dynamic_cast<api::StorageReply*>(replies.front().get()); CPPUNIT_ASSERT(reply); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS, - reply->getResult().getResult()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS, reply->getResult().getResult()); } void @@ -641,8 +626,7 @@ VisitorManagerTest::testVisitorCallbacks() initializeTest(); std::ostringstream replydata; api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "TestVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "TestVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->addBucketToBeVisited(document::BucketId(16, 5)); cmd->setAddress(address); @@ -659,8 +643,8 @@ VisitorManagerTest::testVisitorCallbacks() CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_MAPVISITOR, session.sentMessages[i]->getType()); - documentapi::MapVisitorMessage* mapvisitormsg( - static_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get())); + auto* mapvisitormsg = dynamic_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get()); + CPPUNIT_ASSERT(mapvisitormsg != nullptr); replydata << mapvisitormsg->getData().get("msg"); @@ -690,8 +674,7 @@ VisitorManagerTest::testVisitorCleanup() for (uint32_t i=0; i<10; ++i) { std::ostringstream ost; ost << "testvis" << i; - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "InvalidVisitor", ost.str(), "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "InvalidVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -703,28 +686,27 @@ VisitorManagerTest::testVisitorCleanup() for (uint32_t i=0; i<10; ++i) { std::ostringstream ost; ost << "testvis" << (i + 10); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); _top->sendDown(cmd); } - - // Should get 14 immediate replies - 10 failures and 4 busy + // Should get 16 immediate replies - 10 failures and 6 busy { - _top->waitForMessages(14, 60); + const int expected_total = 16; + _top->waitForMessages(expected_total, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL(size_t(expected_total), replies.size()); int failures = 0; int busy = 0; - for (uint32_t i=0; i< 14; ++i) { + for (uint32_t i=0; i< expected_total; ++i) { std::shared_ptr<api::StorageMessage> msg(replies[i]); CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); CPPUNIT_ASSERT(reply.get()); if (i < 10) { @@ -741,9 +723,11 @@ VisitorManagerTest::testVisitorCleanup() } CPPUNIT_ASSERT_EQUAL(10, failures); - CPPUNIT_ASSERT_EQUAL(4, busy); + CPPUNIT_ASSERT_EQUAL(expected_total - 10, busy); } + // 4 pending + // Finish a visitor std::vector<document::Document::SP > docs; std::vector<document::DocumentId> docIds; @@ -753,22 +737,23 @@ VisitorManagerTest::testVisitorCleanup() // Should get a reply for the visitor. verifyCreateVisitorReply(api::ReturnCode::OK); + // 3 pending + // Fail a visitor getMessagesAndReply(1, getSession(1), docs, docIds, api::ReturnCode::INTERNAL_FAILURE); // Should get a reply for the visitor. verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE); - while (_manager->getActiveVisitorCount() > 2) { - FastOS_Thread::Sleep(10); - } + // 2 pending + + CPPUNIT_ASSERT_EQUAL(2u, _manager->getActiveVisitorCount()); // Start a bunch of more visitors for (uint32_t i=0; i<10; ++i) { std::ostringstream ost; ost << "testvis" << (i + 24); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -778,17 +763,26 @@ VisitorManagerTest::testVisitorCleanup() // Should now get 8 busy. _top->waitForMessages(8, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); - CPPUNIT_ASSERT_EQUAL(8, (int)replies.size()); + CPPUNIT_ASSERT_EQUAL(size_t(8), replies.size()); for (uint32_t i=0; i< replies.size(); ++i) { std::shared_ptr<api::StorageMessage> msg(replies[i]); CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); CPPUNIT_ASSERT(reply.get()); CPPUNIT_ASSERT_EQUAL(api::ReturnCode::BUSY, reply->getResult().getResult()); } + + // 4 still pending, need to clean up our stuff before tearing down. + CPPUNIT_ASSERT_EQUAL(4u, _manager->getActiveVisitorCount()); + + for (uint32_t i = 0; i < 4; ++i) { + getMessagesAndReply(1, getSession(i + 2), docs, docIds); + verifyCreateVisitorReply(api::ReturnCode::OK); + } + + CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount()); } void @@ -798,18 +792,13 @@ VisitorManagerTest::testAbortOnFailedVisitorInfo() api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); { - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); _top->sendDown(cmd); } - uint32_t visitorRepliesReceived = 0; - uint32_t oki = 0; - uint32_t failed = 0; - std::vector<document::Document::SP > docs; std::vector<document::DocumentId> docIds; @@ -823,37 +812,13 @@ VisitorManagerTest::testAbortOnFailedVisitorInfo() mbus::Reply::UP reply = cmd->createReply(); - CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO, session.sentMessages[1]->getType()); + CPPUNIT_ASSERT_EQUAL(uint32_t(documentapi::DocumentProtocol::MESSAGE_VISITORINFO), session.sentMessages[1]->getType()); reply->swapState(*session.sentMessages[1]); reply->setMessage(mbus::Message::UP(session.sentMessages[1].release())); reply->addError(mbus::Error(api::ReturnCode::NOT_CONNECTED, "Me no ready")); session.reply(std::move(reply)); } - - _top->waitForMessages(1, 60); - const msg_ptr_vector replies = _top->getRepliesOnce(); - for (uint32_t i=0; i< replies.size(); ++i) { - std::shared_ptr<api::StorageMessage> msg(replies[i]); - if (msg->getType() == api::MessageType::VISITOR_CREATE_REPLY) - { - ++visitorRepliesReceived; - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); - CPPUNIT_ASSERT(reply.get()); - if (reply->getResult().success()) { - ++oki; - std::cerr << "\n" << reply->toString(true) << "\n"; - } else { - ++failed; - } - } - } - - std::ostringstream errmsg; - errmsg << "oki " << oki << ", failed " << failed; - - CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 0u, oki); - CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 1u, failed); + verifyCreateVisitorReply(api::ReturnCode::NOT_CONNECTED); } void @@ -863,10 +828,8 @@ VisitorManagerTest::testAbortOnFieldPathError() api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); // Use bogus field path to force error to happen - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", - "testvis", - "testdoctype1.headerval{bogus} == 1234")); + auto cmd = std::make_shared<api::CreateVisitorCommand>( + makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval{bogus} == 1234"); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -885,8 +848,7 @@ VisitorManagerTest::testVisitorQueueTimeout() { vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor()); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(1); @@ -897,18 +859,13 @@ VisitorManagerTest::testVisitorQueueTimeout() } // Don't answer any messages. Make sure we timeout anyways. - uint32_t visitorRepliesReceived = 0; - _top->waitForMessages(1, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); std::shared_ptr<api::StorageMessage> msg(replies[0]); CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - ++visitorRepliesReceived; - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY, - "Visitor timed out in visitor queue"), + auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue"), reply->getResult()); } @@ -918,8 +875,7 @@ VisitorManagerTest::testVisitorProcessingTimeout() initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -932,19 +888,7 @@ VisitorManagerTest::testVisitorProcessingTimeout() _node->getClock().addSecondsToTime(1000); - // Don't answer any messages. Make sure we timeout anyways. - uint32_t visitorRepliesReceived = 0; - - _top->waitForMessages(1, 60); - const msg_ptr_vector replies = _top->getRepliesOnce(); - std::shared_ptr<api::StorageMessage> msg(replies[0]); - - CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - ++visitorRepliesReceived; - std::shared_ptr<api::CreateVisitorReply> reply( - std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, - reply->getResult().getResult()); + verifyCreateVisitorReply(api::ReturnCode::ABORTED); } namespace { @@ -955,8 +899,7 @@ namespace { std::ostringstream ost; ost << "testvis" << ++nextVisitor; api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - std::shared_ptr<api::CreateVisitorCommand> cmd( - new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), "")); + auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(timeout); @@ -1002,14 +945,26 @@ VisitorManagerTest::testPrioritizedVisitorQueing() // Finish the first visitor std::vector<document::Document::SP > docs; std::vector<document::DocumentId> docIds; - getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK, - documentapi::Priority::PRI_HIGHEST); + getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK, Priority::PRI_HIGHEST); verifyCreateVisitorReply(api::ReturnCode::OK); // We should now start the highest priority visitor. - getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK, - documentapi::Priority::PRI_VERY_HIGH); + getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK, Priority::PRI_VERY_HIGH); CPPUNIT_ASSERT_EQUAL(ids[9], verifyCreateVisitorReply(api::ReturnCode::OK)); + + // 3 pending, 3 in queue. Clean them up + std::vector<uint32_t> pending_sessions = {1, 2, 3, 5, 6, 7}; + for (auto session : pending_sessions) { + finishAndWaitForVisitorSessionCompletion(session); + } + CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount()); +} + +void VisitorManagerTest::finishAndWaitForVisitorSessionCompletion(uint32_t sessionIndex) { + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + getMessagesAndReply(1, getSession(sessionIndex), docs, docIds, api::ReturnCode::OK, std::optional<Priority::Value>()); + verifyCreateVisitorReply(api::ReturnCode::OK); } void @@ -1116,6 +1071,7 @@ VisitorManagerTest::testPrioritizedMaxConcurrentVisitors() { CPPUNIT_ASSERT(finishedVisitors.find(ids[11]) != finishedVisitors.end()); CPPUNIT_ASSERT(finishedVisitors.find(ids[14]) != finishedVisitors.end()); + CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount()); } void @@ -1135,6 +1091,9 @@ VisitorManagerTest::testVisitorQueingZeroQueueSize() { sendCreateVisitor(1000, *_top, 100 - i); verifyCreateVisitorReply(api::ReturnCode::BUSY); } + for (uint32_t session = 0; session < 4; ++session) { + finishAndWaitForVisitorSessionCompletion(session); + } } void @@ -1148,8 +1107,10 @@ VisitorManagerTest::testStatusPage() { sendCreateVisitor(1000000, *_top, 1); sendCreateVisitor(1000000, *_top, 128); - TestVisitorMessageSession& session = getSession(0); - session.waitForMessages(1); + { + TestVisitorMessageSession& session = getSession(0); + session.waitForMessages(1); + } std::ostringstream ss; static_cast<framework::HtmlStatusReporter&>(*_manager).reportHtmlStatus(ss, path); @@ -1162,6 +1123,10 @@ VisitorManagerTest::testStatusPage() { CPPUNIT_ASSERT(str.find("Visitor thread 0") != std::string::npos); CPPUNIT_ASSERT(str.find("Disconnected visitor timeout") != std::string::npos); // verbose per thread CPPUNIT_ASSERT(str.find("Message #1 <b>putdocumentmessage</b>") != std::string::npos); // 1 active + + for (uint32_t session = 0; session < 2 ; ++session){ + finishAndWaitForVisitorSessionCompletion(session); + } } } |