diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-05-31 23:20:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-31 23:20:22 +0200 |
commit | 3cbd07efd93415e92d465dd10c65046dd566835a (patch) | |
tree | fa3a54b2326f1c95f1e15b491e1e84ce24a8fa3e /storage/src | |
parent | d1e6b20cc515c9fac69d2349cb0c59fa6d2fff2a (diff) |
Revert "Ensure visitormanager tests clean up after themselves"
Diffstat (limited to 'storage/src')
-rw-r--r-- | storage/src/tests/visiting/visitormanagertest.cpp | 261 |
1 files changed, 148 insertions, 113 deletions
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 9eb1e6d2311..8b17e851868 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -19,13 +19,9 @@ #include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> #include <vespa/documentapi/messagebus/messages/visitor.h> #include <vespa/config/common/exceptions.h> -#include <optional> -#include <thread> -#include <chrono> using document::test::makeDocumentBucket; using document::test::makeBucketSpace; -using documentapi::Priority; namespace storage { namespace { @@ -82,9 +78,8 @@ public: std::vector<document::Document::SP >& docs, std::vector<document::DocumentId>& docIds, api::ReturnCode::Result returnCode = api::ReturnCode::OK, - std::optional<Priority::Value> priority = documentapi::Priority::PRI_NORMAL_4); + documentapi::Priority::Value priority = documentapi::Priority::PRI_NORMAL_4); uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs); - void finishAndWaitForVisitorSessionCompletion(uint32_t sessionIndex); void testNormalUsage(); void testResending(); @@ -190,7 +185,8 @@ VisitorManagerTest::initializeTest() for (uint32_t i=0; i<10; ++i) { document::BucketId bid(16, i); - auto cmd = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bid)); + std::shared_ptr<api::CreateBucketCommand> cmd( + new api::CreateBucketCommand(makeDocumentBucket(bid))); cmd->setAddress(address); cmd->setSourceIndex(0); _top->sendDown(cmd); @@ -206,7 +202,8 @@ VisitorManagerTest::initializeTest() for (uint32_t i=0; i<docCount; ++i) { document::BucketId bid(16, i); - auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bid), _documents[i], i+1); + std::shared_ptr<api::PutCommand> cmd( + new api::PutCommand(makeDocumentBucket(bid), _documents[i], i+1)); cmd->setAddress(address); _top->sendDown(cmd); _top->waitForMessages(1, 60); @@ -229,40 +226,45 @@ VisitorManagerTest::addSomeRemoves(bool removeAll) for (uint32_t i=0; i<docCount; i += (removeAll ? 1 : 4)) { // Add it to the database document::BucketId bid(16, i % 10); - auto cmd = std::make_shared<api::RemoveCommand>( - makeDocumentBucket(bid), _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1); + std::shared_ptr<api::RemoveCommand> cmd( + new api::RemoveCommand( + makeDocumentBucket(bid), _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1)); cmd->setAddress(address); _top->sendDown(cmd); _top->waitForMessages(1, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); - CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size()); - auto reply = std::dynamic_pointer_cast<api::RemoveReply>(replies[0]); + CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); + std::shared_ptr<api::RemoveReply> reply( + std::dynamic_pointer_cast<api::RemoveReply>( + replies[0])); CPPUNIT_ASSERT(reply.get()); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), reply->getResult()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), + reply->getResult()); } } void VisitorManagerTest::tearDown() { - if (_top) { - assert(_top->getNumReplies() == 0); + if (_top.get() != 0) { _top->close(); _top->flush(); - _top.reset(); + _top.reset(0); } - _node.reset(); - _messageSessionFactory.reset(); - _manager = nullptr; + _node.reset(0); + _messageSessionFactory.reset(0); + _manager = 0; } TestVisitorMessageSession& VisitorManagerTest::getSession(uint32_t n) { // Wait until we have started the visitor - const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions); + const std::vector<TestVisitorMessageSession*>& sessions( + _messageSessionFactory->_visitorSessions); framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); while (true) { { vespalib::LockGuard lock(_messageSessionFactory->_accessLock); @@ -274,7 +276,7 @@ VisitorManagerTest::getSession(uint32_t n) throw vespalib::IllegalStateException( "Timed out waiting for visitor session", VESPA_STRLOC); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + FastOS_Thread::Sleep(10); } throw std::logic_error("unreachable"); } @@ -286,7 +288,7 @@ VisitorManagerTest::getMessagesAndReply( std::vector<document::Document::SP >& docs, std::vector<document::DocumentId>& docIds, api::ReturnCode::Result result, - std::optional<Priority::Value> priority) + documentapi::Priority::Value priority) { for (int i = 0; i < expectedCount; i++) { session.waitForMessages(i + 1); @@ -294,10 +296,8 @@ VisitorManagerTest::getMessagesAndReply( { vespalib::MonitorGuard guard(session.getMonitor()); - if (priority) { - CPPUNIT_ASSERT_EQUAL(*priority, - session.sentMessages[i]->getPriority()); - } + CPPUNIT_ASSERT_EQUAL(priority, + session.sentMessages[i]->getPriority()); switch (session.sentMessages[i]->getType()) { case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: @@ -340,7 +340,8 @@ VisitorManagerTest::verifyCreateVisitorReply( CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); CPPUNIT_ASSERT(reply.get()); CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult()); @@ -409,7 +410,8 @@ VisitorManagerTest::testNormalUsage() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setControlDestination("foo/bar"); @@ -434,7 +436,8 @@ VisitorManagerTest::testResending() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setControlDestination("foo/bar"); @@ -483,7 +486,8 @@ VisitorManagerTest::testVisitEmptyBucket() initializeTest(); addSomeRemoves(true); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); @@ -498,7 +502,8 @@ VisitorManagerTest::testMultiBucketVisit() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); for (uint32_t i=0; i<10; ++i) { cmd->addBucketToBeVisited(document::BucketId(16, i)); } @@ -522,7 +527,8 @@ VisitorManagerTest::testNoBuckets() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->setAddress(address); _top->sendDown(cmd); @@ -530,12 +536,15 @@ VisitorManagerTest::testNoBuckets() // Should get one reply; a CreateVisitorReply with error since no // buckets where specified in the CreateVisitorCommand _top->waitForMessages(1, 60); - const msg_ptr_vector replies = _top->getRepliesOnce(); + const msg_ptr_vector replies = _top->getRepliesOnce(); CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); - auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(replies[0]); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>( + replies[0])); // Verify that cast went ok => it was a CreateVisitorReply message CPPUNIT_ASSERT(reply.get()); - api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS, "No buckets specified"); + api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS, + "No buckets specified"); CPPUNIT_ASSERT_EQUAL(ret, reply->getResult()); } @@ -544,7 +553,8 @@ void VisitorManagerTest::testVisitPutsAndRemoves() initializeTest(); addSomeRemoves(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->setAddress(address); cmd->setVisitRemoves(); for (uint32_t i=0; i<10; ++i) { @@ -571,7 +581,9 @@ void VisitorManagerTest::testVisitWithTimeframeAndSelection() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval < 2"); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", + "testdoctype1.headerval < 2")); cmd->setFromTime(3); cmd->setToTime(8); for (uint32_t i=0; i<10; ++i) { @@ -601,8 +613,9 @@ void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection() { initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", - "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2"); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", + "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2")); cmd->setFromTime(3); cmd->setToTime(8); for (uint32_t i=0; i<10; ++i) { @@ -615,9 +628,11 @@ void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection() const msg_ptr_vector replies = _top->getRepliesOnce(); CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); - auto* reply = dynamic_cast<api::StorageReply*>(replies.front().get()); + api::StorageReply* reply = dynamic_cast<api::StorageReply*>( + replies.front().get()); CPPUNIT_ASSERT(reply); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS, reply->getResult().getResult()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS, + reply->getResult().getResult()); } void @@ -626,7 +641,8 @@ VisitorManagerTest::testVisitorCallbacks() initializeTest(); std::ostringstream replydata; api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "TestVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "TestVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->addBucketToBeVisited(document::BucketId(16, 5)); cmd->setAddress(address); @@ -643,8 +659,8 @@ VisitorManagerTest::testVisitorCallbacks() CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_MAPVISITOR, session.sentMessages[i]->getType()); - auto* mapvisitormsg = dynamic_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get()); - CPPUNIT_ASSERT(mapvisitormsg != nullptr); + documentapi::MapVisitorMessage* mapvisitormsg( + static_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get())); replydata << mapvisitormsg->getData().get("msg"); @@ -674,7 +690,8 @@ VisitorManagerTest::testVisitorCleanup() for (uint32_t i=0; i<10; ++i) { std::ostringstream ost; ost << "testvis" << i; - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "InvalidVisitor", ost.str(), ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "InvalidVisitor", ost.str(), "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -686,27 +703,28 @@ VisitorManagerTest::testVisitorCleanup() for (uint32_t i=0; i<10; ++i) { std::ostringstream ost; ost << "testvis" << (i + 10); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); _top->sendDown(cmd); } - // Should get 16 immediate replies - 10 failures and 6 busy + + // Should get 14 immediate replies - 10 failures and 4 busy { - const int expected_total = 16; - _top->waitForMessages(expected_total, 60); + _top->waitForMessages(14, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); - CPPUNIT_ASSERT_EQUAL(size_t(expected_total), replies.size()); int failures = 0; int busy = 0; - for (uint32_t i=0; i< expected_total; ++i) { + for (uint32_t i=0; i< 14; ++i) { std::shared_ptr<api::StorageMessage> msg(replies[i]); CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); CPPUNIT_ASSERT(reply.get()); if (i < 10) { @@ -723,11 +741,9 @@ VisitorManagerTest::testVisitorCleanup() } CPPUNIT_ASSERT_EQUAL(10, failures); - CPPUNIT_ASSERT_EQUAL(expected_total - 10, busy); + CPPUNIT_ASSERT_EQUAL(4, busy); } - // 4 pending - // Finish a visitor std::vector<document::Document::SP > docs; std::vector<document::DocumentId> docIds; @@ -737,23 +753,22 @@ VisitorManagerTest::testVisitorCleanup() // Should get a reply for the visitor. verifyCreateVisitorReply(api::ReturnCode::OK); - // 3 pending - // Fail a visitor getMessagesAndReply(1, getSession(1), docs, docIds, api::ReturnCode::INTERNAL_FAILURE); // Should get a reply for the visitor. verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE); - // 2 pending - - CPPUNIT_ASSERT_EQUAL(2u, _manager->getActiveVisitorCount()); + while (_manager->getActiveVisitorCount() > 2) { + FastOS_Thread::Sleep(10); + } // Start a bunch of more visitors for (uint32_t i=0; i<10; ++i) { std::ostringstream ost; ost << "testvis" << (i + 24); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -763,26 +778,17 @@ VisitorManagerTest::testVisitorCleanup() // Should now get 8 busy. _top->waitForMessages(8, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); - CPPUNIT_ASSERT_EQUAL(size_t(8), replies.size()); + CPPUNIT_ASSERT_EQUAL(8, (int)replies.size()); for (uint32_t i=0; i< replies.size(); ++i) { std::shared_ptr<api::StorageMessage> msg(replies[i]); CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); CPPUNIT_ASSERT(reply.get()); CPPUNIT_ASSERT_EQUAL(api::ReturnCode::BUSY, reply->getResult().getResult()); } - - // 4 still pending, need to clean up our stuff before tearing down. - CPPUNIT_ASSERT_EQUAL(4u, _manager->getActiveVisitorCount()); - - for (uint32_t i = 0; i < 4; ++i) { - getMessagesAndReply(1, getSession(i + 2), docs, docIds); - verifyCreateVisitorReply(api::ReturnCode::OK); - } - - CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount()); } void @@ -792,13 +798,18 @@ VisitorManagerTest::testAbortOnFailedVisitorInfo() api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); { - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); _top->sendDown(cmd); } + uint32_t visitorRepliesReceived = 0; + uint32_t oki = 0; + uint32_t failed = 0; + std::vector<document::Document::SP > docs; std::vector<document::DocumentId> docIds; @@ -812,13 +823,37 @@ VisitorManagerTest::testAbortOnFailedVisitorInfo() mbus::Reply::UP reply = cmd->createReply(); - CPPUNIT_ASSERT_EQUAL(uint32_t(documentapi::DocumentProtocol::MESSAGE_VISITORINFO), session.sentMessages[1]->getType()); + CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO, session.sentMessages[1]->getType()); reply->swapState(*session.sentMessages[1]); reply->setMessage(mbus::Message::UP(session.sentMessages[1].release())); reply->addError(mbus::Error(api::ReturnCode::NOT_CONNECTED, "Me no ready")); session.reply(std::move(reply)); } - verifyCreateVisitorReply(api::ReturnCode::NOT_CONNECTED); + + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + for (uint32_t i=0; i< replies.size(); ++i) { + std::shared_ptr<api::StorageMessage> msg(replies[i]); + if (msg->getType() == api::MessageType::VISITOR_CREATE_REPLY) + { + ++visitorRepliesReceived; + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT(reply.get()); + if (reply->getResult().success()) { + ++oki; + std::cerr << "\n" << reply->toString(true) << "\n"; + } else { + ++failed; + } + } + } + + std::ostringstream errmsg; + errmsg << "oki " << oki << ", failed " << failed; + + CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 0u, oki); + CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 1u, failed); } void @@ -828,8 +863,10 @@ VisitorManagerTest::testAbortOnFieldPathError() api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); // Use bogus field path to force error to happen - auto cmd = std::make_shared<api::CreateVisitorCommand>( - makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval{bogus} == 1234"); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", + "testvis", + "testdoctype1.headerval{bogus} == 1234")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -848,7 +885,8 @@ VisitorManagerTest::testVisitorQueueTimeout() { vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor()); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(1); @@ -859,13 +897,18 @@ VisitorManagerTest::testVisitorQueueTimeout() } // Don't answer any messages. Make sure we timeout anyways. + uint32_t visitorRepliesReceived = 0; + _top->waitForMessages(1, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); std::shared_ptr<api::StorageMessage> msg(replies[0]); CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); - auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); - CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue"), + ++visitorRepliesReceived; + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY, + "Visitor timed out in visitor queue"), reply->getResult()); } @@ -875,7 +918,8 @@ VisitorManagerTest::testVisitorProcessingTimeout() initializeTest(); api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(0); @@ -888,7 +932,19 @@ VisitorManagerTest::testVisitorProcessingTimeout() _node->getClock().addSecondsToTime(1000); - verifyCreateVisitorReply(api::ReturnCode::ABORTED); + // Don't answer any messages. Make sure we timeout anyways. + uint32_t visitorRepliesReceived = 0; + + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + std::shared_ptr<api::StorageMessage> msg(replies[0]); + + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + ++visitorRepliesReceived; + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + reply->getResult().getResult()); } namespace { @@ -899,7 +955,8 @@ namespace { std::ostringstream ost; ost << "testvis" << ++nextVisitor; api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), ""); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), "")); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(address); cmd->setQueueTimeout(timeout); @@ -945,26 +1002,14 @@ VisitorManagerTest::testPrioritizedVisitorQueing() // Finish the first visitor std::vector<document::Document::SP > docs; std::vector<document::DocumentId> docIds; - getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK, Priority::PRI_HIGHEST); + getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK, + documentapi::Priority::PRI_HIGHEST); verifyCreateVisitorReply(api::ReturnCode::OK); // We should now start the highest priority visitor. - getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK, Priority::PRI_VERY_HIGH); + getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK, + documentapi::Priority::PRI_VERY_HIGH); CPPUNIT_ASSERT_EQUAL(ids[9], verifyCreateVisitorReply(api::ReturnCode::OK)); - - // 3 pending, 3 in queue. Clean them up - std::vector<uint32_t> pending_sessions = {1, 2, 3, 5, 6, 7}; - for (auto session : pending_sessions) { - finishAndWaitForVisitorSessionCompletion(session); - } - CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount()); -} - -void VisitorManagerTest::finishAndWaitForVisitorSessionCompletion(uint32_t sessionIndex) { - std::vector<document::Document::SP > docs; - std::vector<document::DocumentId> docIds; - getMessagesAndReply(1, getSession(sessionIndex), docs, docIds, api::ReturnCode::OK, std::optional<Priority::Value>()); - verifyCreateVisitorReply(api::ReturnCode::OK); } void @@ -1071,7 +1116,6 @@ VisitorManagerTest::testPrioritizedMaxConcurrentVisitors() { CPPUNIT_ASSERT(finishedVisitors.find(ids[11]) != finishedVisitors.end()); CPPUNIT_ASSERT(finishedVisitors.find(ids[14]) != finishedVisitors.end()); - CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount()); } void @@ -1091,9 +1135,6 @@ VisitorManagerTest::testVisitorQueingZeroQueueSize() { sendCreateVisitor(1000, *_top, 100 - i); verifyCreateVisitorReply(api::ReturnCode::BUSY); } - for (uint32_t session = 0; session < 4; ++session) { - finishAndWaitForVisitorSessionCompletion(session); - } } void @@ -1107,10 +1148,8 @@ VisitorManagerTest::testStatusPage() { sendCreateVisitor(1000000, *_top, 1); sendCreateVisitor(1000000, *_top, 128); - { - TestVisitorMessageSession& session = getSession(0); - session.waitForMessages(1); - } + TestVisitorMessageSession& session = getSession(0); + session.waitForMessages(1); std::ostringstream ss; static_cast<framework::HtmlStatusReporter&>(*_manager).reportHtmlStatus(ss, path); @@ -1123,10 +1162,6 @@ VisitorManagerTest::testStatusPage() { CPPUNIT_ASSERT(str.find("Visitor thread 0") != std::string::npos); CPPUNIT_ASSERT(str.find("Disconnected visitor timeout") != std::string::npos); // verbose per thread CPPUNIT_ASSERT(str.find("Message #1 <b>putdocumentmessage</b>") != std::string::npos); // 1 active - - for (uint32_t session = 0; session < 2 ; ++session){ - finishAndWaitForVisitorSessionCompletion(session); - } } } |