summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-05-31 20:35:39 +0200
committerGitHub <noreply@github.com>2018-05-31 20:35:39 +0200
commitb17c853d5f593585eb50a794a3824951b4032278 (patch)
treebb0d30877805eb41e425ad3202091ea96624bd48 /storage
parente601f573e2f2351525343a5cf3524b4bf67baa8a (diff)
parent1ecc3ec1e66b68971845615d5bdaa4a97f3cb88b (diff)
Merge pull request #6035 from vespa-engine/vekterli/ensure-visitormanager-tests-clean-up-after-themselves
Ensure visitormanager tests clean up after themselves
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp261
1 files changed, 113 insertions, 148 deletions
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 8b17e851868..9eb1e6d2311 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -19,9 +19,13 @@
#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
#include <vespa/documentapi/messagebus/messages/visitor.h>
#include <vespa/config/common/exceptions.h>
+#include <optional>
+#include <thread>
+#include <chrono>
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
+using documentapi::Priority;
namespace storage {
namespace {
@@ -78,8 +82,9 @@ public:
std::vector<document::Document::SP >& docs,
std::vector<document::DocumentId>& docIds,
api::ReturnCode::Result returnCode = api::ReturnCode::OK,
- documentapi::Priority::Value priority = documentapi::Priority::PRI_NORMAL_4);
+ std::optional<Priority::Value> priority = documentapi::Priority::PRI_NORMAL_4);
uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs);
+ void finishAndWaitForVisitorSessionCompletion(uint32_t sessionIndex);
void testNormalUsage();
void testResending();
@@ -185,8 +190,7 @@ VisitorManagerTest::initializeTest()
for (uint32_t i=0; i<10; ++i) {
document::BucketId bid(16, i);
- std::shared_ptr<api::CreateBucketCommand> cmd(
- new api::CreateBucketCommand(makeDocumentBucket(bid)));
+ auto cmd = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bid));
cmd->setAddress(address);
cmd->setSourceIndex(0);
_top->sendDown(cmd);
@@ -202,8 +206,7 @@ VisitorManagerTest::initializeTest()
for (uint32_t i=0; i<docCount; ++i) {
document::BucketId bid(16, i);
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(bid), _documents[i], i+1));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bid), _documents[i], i+1);
cmd->setAddress(address);
_top->sendDown(cmd);
_top->waitForMessages(1, 60);
@@ -226,45 +229,40 @@ VisitorManagerTest::addSomeRemoves(bool removeAll)
for (uint32_t i=0; i<docCount; i += (removeAll ? 1 : 4)) {
// Add it to the database
document::BucketId bid(16, i % 10);
- std::shared_ptr<api::RemoveCommand> cmd(
- new api::RemoveCommand(
- makeDocumentBucket(bid), _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1));
+ auto cmd = std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(bid), _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1);
cmd->setAddress(address);
_top->sendDown(cmd);
_top->waitForMessages(1, 60);
const msg_ptr_vector replies = _top->getRepliesOnce();
- CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
- std::shared_ptr<api::RemoveReply> reply(
- std::dynamic_pointer_cast<api::RemoveReply>(
- replies[0]));
+ CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size());
+ auto reply = std::dynamic_pointer_cast<api::RemoveReply>(replies[0]);
CPPUNIT_ASSERT(reply.get());
- CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK),
- reply->getResult());
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), reply->getResult());
}
}
void
VisitorManagerTest::tearDown()
{
- if (_top.get() != 0) {
+ if (_top) {
+ assert(_top->getNumReplies() == 0);
_top->close();
_top->flush();
- _top.reset(0);
+ _top.reset();
}
- _node.reset(0);
- _messageSessionFactory.reset(0);
- _manager = 0;
+ _node.reset();
+ _messageSessionFactory.reset();
+ _manager = nullptr;
}
TestVisitorMessageSession&
VisitorManagerTest::getSession(uint32_t n)
{
// Wait until we have started the visitor
- const std::vector<TestVisitorMessageSession*>& sessions(
- _messageSessionFactory->_visitorSessions);
+ const std::vector<TestVisitorMessageSession*>& sessions(_messageSessionFactory->_visitorSessions);
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ framework::MilliSecTime endTime(clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
while (true) {
{
vespalib::LockGuard lock(_messageSessionFactory->_accessLock);
@@ -276,7 +274,7 @@ VisitorManagerTest::getSession(uint32_t n)
throw vespalib::IllegalStateException(
"Timed out waiting for visitor session", VESPA_STRLOC);
}
- FastOS_Thread::Sleep(10);
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
throw std::logic_error("unreachable");
}
@@ -288,7 +286,7 @@ VisitorManagerTest::getMessagesAndReply(
std::vector<document::Document::SP >& docs,
std::vector<document::DocumentId>& docIds,
api::ReturnCode::Result result,
- documentapi::Priority::Value priority)
+ std::optional<Priority::Value> priority)
{
for (int i = 0; i < expectedCount; i++) {
session.waitForMessages(i + 1);
@@ -296,8 +294,10 @@ VisitorManagerTest::getMessagesAndReply(
{
vespalib::MonitorGuard guard(session.getMonitor());
- CPPUNIT_ASSERT_EQUAL(priority,
- session.sentMessages[i]->getPriority());
+ if (priority) {
+ CPPUNIT_ASSERT_EQUAL(*priority,
+ session.sentMessages[i]->getPriority());
+ }
switch (session.sentMessages[i]->getType()) {
case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT:
@@ -340,8 +340,7 @@ VisitorManagerTest::verifyCreateVisitorReply(
CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg);
CPPUNIT_ASSERT(reply.get());
CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult());
@@ -410,8 +409,7 @@ VisitorManagerTest::testNormalUsage()
{
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setControlDestination("foo/bar");
@@ -436,8 +434,7 @@ VisitorManagerTest::testResending()
{
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setControlDestination("foo/bar");
@@ -486,8 +483,7 @@ VisitorManagerTest::testVisitEmptyBucket()
initializeTest();
addSomeRemoves(true);
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
@@ -502,8 +498,7 @@ VisitorManagerTest::testMultiBucketVisit()
{
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
for (uint32_t i=0; i<10; ++i) {
cmd->addBucketToBeVisited(document::BucketId(16, i));
}
@@ -527,8 +522,7 @@ VisitorManagerTest::testNoBuckets()
{
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->setAddress(address);
_top->sendDown(cmd);
@@ -536,15 +530,12 @@ VisitorManagerTest::testNoBuckets()
// Should get one reply; a CreateVisitorReply with error since no
// buckets where specified in the CreateVisitorCommand
_top->waitForMessages(1, 60);
- const msg_ptr_vector replies = _top->getRepliesOnce();
+ const msg_ptr_vector replies = _top->getRepliesOnce();
CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(
- replies[0]));
+ auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(replies[0]);
// Verify that cast went ok => it was a CreateVisitorReply message
CPPUNIT_ASSERT(reply.get());
- api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS,
- "No buckets specified");
+ api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS, "No buckets specified");
CPPUNIT_ASSERT_EQUAL(ret, reply->getResult());
}
@@ -553,8 +544,7 @@ void VisitorManagerTest::testVisitPutsAndRemoves()
initializeTest();
addSomeRemoves();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->setAddress(address);
cmd->setVisitRemoves();
for (uint32_t i=0; i<10; ++i) {
@@ -581,9 +571,7 @@ void VisitorManagerTest::testVisitWithTimeframeAndSelection()
{
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis",
- "testdoctype1.headerval < 2"));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval < 2");
cmd->setFromTime(3);
cmd->setToTime(8);
for (uint32_t i=0; i<10; ++i) {
@@ -613,9 +601,8 @@ void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection()
{
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis",
- "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2"));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis",
+ "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2");
cmd->setFromTime(3);
cmd->setToTime(8);
for (uint32_t i=0; i<10; ++i) {
@@ -628,11 +615,9 @@ void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection()
const msg_ptr_vector replies = _top->getRepliesOnce();
CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
- api::StorageReply* reply = dynamic_cast<api::StorageReply*>(
- replies.front().get());
+ auto* reply = dynamic_cast<api::StorageReply*>(replies.front().get());
CPPUNIT_ASSERT(reply);
- CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS,
- reply->getResult().getResult());
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS, reply->getResult().getResult());
}
void
@@ -641,8 +626,7 @@ VisitorManagerTest::testVisitorCallbacks()
initializeTest();
std::ostringstream replydata;
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "TestVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "TestVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->addBucketToBeVisited(document::BucketId(16, 5));
cmd->setAddress(address);
@@ -659,8 +643,8 @@ VisitorManagerTest::testVisitorCallbacks()
CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_MAPVISITOR, session.sentMessages[i]->getType());
- documentapi::MapVisitorMessage* mapvisitormsg(
- static_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get()));
+ auto* mapvisitormsg = dynamic_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get());
+ CPPUNIT_ASSERT(mapvisitormsg != nullptr);
replydata << mapvisitormsg->getData().get("msg");
@@ -690,8 +674,7 @@ VisitorManagerTest::testVisitorCleanup()
for (uint32_t i=0; i<10; ++i) {
std::ostringstream ost;
ost << "testvis" << i;
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "InvalidVisitor", ost.str(), ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "InvalidVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(0);
@@ -703,28 +686,27 @@ VisitorManagerTest::testVisitorCleanup()
for (uint32_t i=0; i<10; ++i) {
std::ostringstream ost;
ost << "testvis" << (i + 10);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(0);
_top->sendDown(cmd);
}
-
- // Should get 14 immediate replies - 10 failures and 4 busy
+ // Should get 16 immediate replies - 10 failures and 6 busy
{
- _top->waitForMessages(14, 60);
+ const int expected_total = 16;
+ _top->waitForMessages(expected_total, 60);
const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL(size_t(expected_total), replies.size());
int failures = 0;
int busy = 0;
- for (uint32_t i=0; i< 14; ++i) {
+ for (uint32_t i=0; i< expected_total; ++i) {
std::shared_ptr<api::StorageMessage> msg(replies[i]);
CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg);
CPPUNIT_ASSERT(reply.get());
if (i < 10) {
@@ -741,9 +723,11 @@ VisitorManagerTest::testVisitorCleanup()
}
CPPUNIT_ASSERT_EQUAL(10, failures);
- CPPUNIT_ASSERT_EQUAL(4, busy);
+ CPPUNIT_ASSERT_EQUAL(expected_total - 10, busy);
}
+ // 4 pending
+
// Finish a visitor
std::vector<document::Document::SP > docs;
std::vector<document::DocumentId> docIds;
@@ -753,22 +737,23 @@ VisitorManagerTest::testVisitorCleanup()
// Should get a reply for the visitor.
verifyCreateVisitorReply(api::ReturnCode::OK);
+ // 3 pending
+
// Fail a visitor
getMessagesAndReply(1, getSession(1), docs, docIds, api::ReturnCode::INTERNAL_FAILURE);
// Should get a reply for the visitor.
verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE);
- while (_manager->getActiveVisitorCount() > 2) {
- FastOS_Thread::Sleep(10);
- }
+ // 2 pending
+
+ CPPUNIT_ASSERT_EQUAL(2u, _manager->getActiveVisitorCount());
// Start a bunch of more visitors
for (uint32_t i=0; i<10; ++i) {
std::ostringstream ost;
ost << "testvis" << (i + 24);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(0);
@@ -778,17 +763,26 @@ VisitorManagerTest::testVisitorCleanup()
// Should now get 8 busy.
_top->waitForMessages(8, 60);
const msg_ptr_vector replies = _top->getRepliesOnce();
- CPPUNIT_ASSERT_EQUAL(8, (int)replies.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(8), replies.size());
for (uint32_t i=0; i< replies.size(); ++i) {
std::shared_ptr<api::StorageMessage> msg(replies[i]);
CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg);
CPPUNIT_ASSERT(reply.get());
CPPUNIT_ASSERT_EQUAL(api::ReturnCode::BUSY, reply->getResult().getResult());
}
+
+ // 4 still pending, need to clean up our stuff before tearing down.
+ CPPUNIT_ASSERT_EQUAL(4u, _manager->getActiveVisitorCount());
+
+ for (uint32_t i = 0; i < 4; ++i) {
+ getMessagesAndReply(1, getSession(i + 2), docs, docIds);
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+ }
+
+ CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount());
}
void
@@ -798,18 +792,13 @@ VisitorManagerTest::testAbortOnFailedVisitorInfo()
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
{
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(0);
_top->sendDown(cmd);
}
- uint32_t visitorRepliesReceived = 0;
- uint32_t oki = 0;
- uint32_t failed = 0;
-
std::vector<document::Document::SP > docs;
std::vector<document::DocumentId> docIds;
@@ -823,37 +812,13 @@ VisitorManagerTest::testAbortOnFailedVisitorInfo()
mbus::Reply::UP reply = cmd->createReply();
- CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO, session.sentMessages[1]->getType());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(documentapi::DocumentProtocol::MESSAGE_VISITORINFO), session.sentMessages[1]->getType());
reply->swapState(*session.sentMessages[1]);
reply->setMessage(mbus::Message::UP(session.sentMessages[1].release()));
reply->addError(mbus::Error(api::ReturnCode::NOT_CONNECTED, "Me no ready"));
session.reply(std::move(reply));
}
-
- _top->waitForMessages(1, 60);
- const msg_ptr_vector replies = _top->getRepliesOnce();
- for (uint32_t i=0; i< replies.size(); ++i) {
- std::shared_ptr<api::StorageMessage> msg(replies[i]);
- if (msg->getType() == api::MessageType::VISITOR_CREATE_REPLY)
- {
- ++visitorRepliesReceived;
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
- CPPUNIT_ASSERT(reply.get());
- if (reply->getResult().success()) {
- ++oki;
- std::cerr << "\n" << reply->toString(true) << "\n";
- } else {
- ++failed;
- }
- }
- }
-
- std::ostringstream errmsg;
- errmsg << "oki " << oki << ", failed " << failed;
-
- CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 0u, oki);
- CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 1u, failed);
+ verifyCreateVisitorReply(api::ReturnCode::NOT_CONNECTED);
}
void
@@ -863,10 +828,8 @@ VisitorManagerTest::testAbortOnFieldPathError()
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
// Use bogus field path to force error to happen
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor",
- "testvis",
- "testdoctype1.headerval{bogus} == 1234"));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(
+ makeBucketSpace(), "DumpVisitor", "testvis", "testdoctype1.headerval{bogus} == 1234");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(0);
@@ -885,8 +848,7 @@ VisitorManagerTest::testVisitorQueueTimeout()
{
vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor());
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(1);
@@ -897,18 +859,13 @@ VisitorManagerTest::testVisitorQueueTimeout()
}
// Don't answer any messages. Make sure we timeout anyways.
- uint32_t visitorRepliesReceived = 0;
-
_top->waitForMessages(1, 60);
const msg_ptr_vector replies = _top->getRepliesOnce();
std::shared_ptr<api::StorageMessage> msg(replies[0]);
CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
- ++visitorRepliesReceived;
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
- CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY,
- "Visitor timed out in visitor queue"),
+ auto reply = std::dynamic_pointer_cast<api::CreateVisitorReply>(msg);
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue"),
reply->getResult());
}
@@ -918,8 +875,7 @@ VisitorManagerTest::testVisitorProcessingTimeout()
initializeTest();
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", "testvis", ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(0);
@@ -932,19 +888,7 @@ VisitorManagerTest::testVisitorProcessingTimeout()
_node->getClock().addSecondsToTime(1000);
- // Don't answer any messages. Make sure we timeout anyways.
- uint32_t visitorRepliesReceived = 0;
-
- _top->waitForMessages(1, 60);
- const msg_ptr_vector replies = _top->getRepliesOnce();
- std::shared_ptr<api::StorageMessage> msg(replies[0]);
-
- CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
- ++visitorRepliesReceived;
- std::shared_ptr<api::CreateVisitorReply> reply(
- std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
- CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
- reply->getResult().getResult());
+ verifyCreateVisitorReply(api::ReturnCode::ABORTED);
}
namespace {
@@ -955,8 +899,7 @@ namespace {
std::ostringstream ost;
ost << "testvis" << ++nextVisitor;
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- new api::CreateVisitorCommand(makeBucketSpace(), "DumpVisitor", ost.str(), ""));
+ auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", ost.str(), "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(address);
cmd->setQueueTimeout(timeout);
@@ -1002,14 +945,26 @@ VisitorManagerTest::testPrioritizedVisitorQueing()
// Finish the first visitor
std::vector<document::Document::SP > docs;
std::vector<document::DocumentId> docIds;
- getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK,
- documentapi::Priority::PRI_HIGHEST);
+ getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK, Priority::PRI_HIGHEST);
verifyCreateVisitorReply(api::ReturnCode::OK);
// We should now start the highest priority visitor.
- getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK,
- documentapi::Priority::PRI_VERY_HIGH);
+ getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK, Priority::PRI_VERY_HIGH);
CPPUNIT_ASSERT_EQUAL(ids[9], verifyCreateVisitorReply(api::ReturnCode::OK));
+
+ // 3 pending, 3 in queue. Clean them up
+ std::vector<uint32_t> pending_sessions = {1, 2, 3, 5, 6, 7};
+ for (auto session : pending_sessions) {
+ finishAndWaitForVisitorSessionCompletion(session);
+ }
+ CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount());
+}
+
+void VisitorManagerTest::finishAndWaitForVisitorSessionCompletion(uint32_t sessionIndex) {
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+ getMessagesAndReply(1, getSession(sessionIndex), docs, docIds, api::ReturnCode::OK, std::optional<Priority::Value>());
+ verifyCreateVisitorReply(api::ReturnCode::OK);
}
void
@@ -1116,6 +1071,7 @@ VisitorManagerTest::testPrioritizedMaxConcurrentVisitors() {
CPPUNIT_ASSERT(finishedVisitors.find(ids[11]) != finishedVisitors.end());
CPPUNIT_ASSERT(finishedVisitors.find(ids[14]) != finishedVisitors.end());
+ CPPUNIT_ASSERT_EQUAL(0u, _manager->getActiveVisitorCount());
}
void
@@ -1135,6 +1091,9 @@ VisitorManagerTest::testVisitorQueingZeroQueueSize() {
sendCreateVisitor(1000, *_top, 100 - i);
verifyCreateVisitorReply(api::ReturnCode::BUSY);
}
+ for (uint32_t session = 0; session < 4; ++session) {
+ finishAndWaitForVisitorSessionCompletion(session);
+ }
}
void
@@ -1148,8 +1107,10 @@ VisitorManagerTest::testStatusPage() {
sendCreateVisitor(1000000, *_top, 1);
sendCreateVisitor(1000000, *_top, 128);
- TestVisitorMessageSession& session = getSession(0);
- session.waitForMessages(1);
+ {
+ TestVisitorMessageSession& session = getSession(0);
+ session.waitForMessages(1);
+ }
std::ostringstream ss;
static_cast<framework::HtmlStatusReporter&>(*_manager).reportHtmlStatus(ss, path);
@@ -1162,6 +1123,10 @@ VisitorManagerTest::testStatusPage() {
CPPUNIT_ASSERT(str.find("Visitor thread 0") != std::string::npos);
CPPUNIT_ASSERT(str.find("Disconnected visitor timeout") != std::string::npos); // verbose per thread
CPPUNIT_ASSERT(str.find("Message #1 <b>putdocumentmessage</b>") != std::string::npos); // 1 active
+
+ for (uint32_t session = 0; session < 2 ; ++session){
+ finishAndWaitForVisitorSessionCompletion(session);
+ }
}
}