diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/visiting |
Publish
Diffstat (limited to 'storage/src/tests/visiting')
-rw-r--r-- | storage/src/tests/visiting/.gitignore | 12 | ||||
-rw-r--r-- | storage/src/tests/visiting/CMakeLists.txt | 11 | ||||
-rw-r--r-- | storage/src/tests/visiting/commandqueuetest.cpp | 223 | ||||
-rw-r--r-- | storage/src/tests/visiting/memory_bounded_trace_test.cpp | 131 | ||||
-rw-r--r-- | storage/src/tests/visiting/visitormanagertest.cpp | 1172 | ||||
-rw-r--r-- | storage/src/tests/visiting/visitortest.cpp | 1023 |
6 files changed, 2572 insertions, 0 deletions
diff --git a/storage/src/tests/visiting/.gitignore b/storage/src/tests/visiting/.gitignore new file mode 100644 index 00000000000..184e5d1c936 --- /dev/null +++ b/storage/src/tests/visiting/.gitignore @@ -0,0 +1,12 @@ +*.So +*.lo +*.o +.*.swp +.config.log +.depend +.depend.NEW +.deps +.libs +Makefile +testrunner +testrunner.core diff --git a/storage/src/tests/visiting/CMakeLists.txt b/storage/src/tests/visiting/CMakeLists.txt new file mode 100644 index 00000000000..60e130c003c --- /dev/null +++ b/storage/src/tests/visiting/CMakeLists.txt @@ -0,0 +1,11 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storage_testvisiting + SOURCES + commandqueuetest.cpp + visitormanagertest.cpp + visitortest.cpp + memory_bounded_trace_test.cpp + DEPENDS + AFTER + storage_storageconfig +) diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp new file mode 100644 index 00000000000..5d6da5f7ea5 --- /dev/null +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -0,0 +1,223 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <vespa/storage/visiting/commandqueue.h> +#include <vespa/storageapi/message/visitor.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +using vespalib::string; + +namespace storage { + +struct CommandQueueTest : public CppUnit::TestFixture +{ + void testFIFO(); + void testFIFOWithPriorities(); + void testReleaseOldest(); + void testReleaseLowestPriority(); + void testDeleteIterator(); + + CPPUNIT_TEST_SUITE(CommandQueueTest); + CPPUNIT_TEST(testFIFO); + CPPUNIT_TEST(testFIFOWithPriorities); + CPPUNIT_TEST(testReleaseOldest); + CPPUNIT_TEST(testReleaseLowestPriority); + CPPUNIT_TEST(testDeleteIterator); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(CommandQueueTest); + +namespace { + std::shared_ptr<api::CreateVisitorCommand> getCommand( + const vespalib::stringref & name, int timeout, + uint8_t priority = 0) + { + vespalib::asciistream ost; + ost << name << " t=" << timeout << " p=" << static_cast<unsigned int>(priority); + // Piggyback name in document selection + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("", "", ost.str())); + cmd->setQueueTimeout(timeout); + cmd->setPriority(priority); + return cmd; + } + + const vespalib::string & + getCommandString(const std::shared_ptr<api::CreateVisitorCommand>& cmd) + { + return cmd->getDocumentSelection(); + } + +} + +void CommandQueueTest::testFIFO() { + framework::defaultimplementation::FakeClock clock; + CommandQueue<api::CreateVisitorCommand> queue(clock); + CPPUNIT_ASSERT(queue.empty()); + // Use all default priorities, meaning what comes out should be in the same order + // as what went in + queue.add(getCommand("first", 1)); + queue.add(getCommand("second", 10)); + queue.add(getCommand("third", 5)); + queue.add(getCommand("fourth", 0)); + queue.add(getCommand("fifth", 3)); + queue.add(getCommand("sixth", 14)); + queue.add(getCommand("seventh", 7)); + + CPPUNIT_ASSERT(!queue.empty()); + std::vector<std::shared_ptr<api::CreateVisitorCommand> > commands; + for (;;) { + std::shared_ptr<api::CreateVisitorCommand> cmd( + queue.releaseNextCommand().first); + if (cmd.get() == 0) break; + commands.push_back(cmd); + } + CPPUNIT_ASSERT_EQUAL(size_t(7), commands.size()); + CPPUNIT_ASSERT_EQUAL(string("first t=1 p=0"), getCommandString(commands[0])); + CPPUNIT_ASSERT_EQUAL(string("second t=10 p=0"), getCommandString(commands[1])); + CPPUNIT_ASSERT_EQUAL(string("third t=5 p=0"), getCommandString(commands[2])); + CPPUNIT_ASSERT_EQUAL(string("fourth t=0 p=0"), getCommandString(commands[3])); + CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=0"), getCommandString(commands[4])); + CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=0"), getCommandString(commands[5])); + CPPUNIT_ASSERT_EQUAL(string("seventh t=7 p=0"), getCommandString(commands[6])); +} + +void CommandQueueTest::testFIFOWithPriorities() { + framework::defaultimplementation::FakeClock clock; + CommandQueue<api::CreateVisitorCommand> queue(clock); + CPPUNIT_ASSERT(queue.empty()); + + queue.add(getCommand("first", 1, 10)); + CPPUNIT_ASSERT_EQUAL(string("first t=1 p=10"), getCommandString(queue.peekLowestPriorityCommand())); + queue.add(getCommand("second", 10, 22)); + queue.add(getCommand("third", 5, 9)); + CPPUNIT_ASSERT_EQUAL(string("second t=10 p=22"), getCommandString(queue.peekLowestPriorityCommand())); + queue.add(getCommand("fourth", 0, 22)); + queue.add(getCommand("fifth", 3, 22)); + CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=22"), getCommandString(queue.peekLowestPriorityCommand())); + queue.add(getCommand("sixth", 14, 50)); + queue.add(getCommand("seventh", 7, 0)); + + CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=50"), getCommandString(queue.peekLowestPriorityCommand())); + + CPPUNIT_ASSERT(!queue.empty()); + std::vector<std::shared_ptr<api::CreateVisitorCommand> > commands; + for (;;) { + std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekNextCommand()); + std::shared_ptr<api::CreateVisitorCommand> cmd(queue.releaseNextCommand().first); + if (cmd.get() == 0 || cmdPeek != cmd) break; + commands.push_back(cmd); + } + CPPUNIT_ASSERT_EQUAL(size_t(7), commands.size()); + CPPUNIT_ASSERT_EQUAL(string("seventh t=7 p=0"), getCommandString(commands[0])); + CPPUNIT_ASSERT_EQUAL(string("third t=5 p=9"), getCommandString(commands[1])); + CPPUNIT_ASSERT_EQUAL(string("first t=1 p=10"), getCommandString(commands[2])); + CPPUNIT_ASSERT_EQUAL(string("second t=10 p=22"), getCommandString(commands[3])); + CPPUNIT_ASSERT_EQUAL(string("fourth t=0 p=22"), getCommandString(commands[4])); + CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=22"), getCommandString(commands[5])); + CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=50"), getCommandString(commands[6])); +} + +void CommandQueueTest::testReleaseOldest() { + framework::defaultimplementation::FakeClock clock(framework::defaultimplementation::FakeClock::FAKE_ABSOLUTE); + CommandQueue<api::CreateVisitorCommand> queue(clock); + CPPUNIT_ASSERT(queue.empty()); + queue.add(getCommand("first", 10)); + queue.add(getCommand("second", 100)); + queue.add(getCommand("third", 1000)); + queue.add(getCommand("fourth", 5)); + queue.add(getCommand("fifth", 3000)); + queue.add(getCommand("sixth", 400)); + queue.add(getCommand("seventh", 700)); + CPPUNIT_ASSERT_EQUAL(7u, queue.size()); + + typedef CommandQueue<api::CreateVisitorCommand>::CommandEntry CommandEntry; + std::list<CommandEntry> timedOut(queue.releaseTimedOut()); + CPPUNIT_ASSERT(timedOut.empty()); + clock.addMilliSecondsToTime(400 * 1000); + timedOut = queue.releaseTimedOut(); + CPPUNIT_ASSERT_EQUAL(size_t(4), timedOut.size()); + std::ostringstream ost; + for (std::list<CommandEntry>::const_iterator it = timedOut.begin(); + it != timedOut.end(); ++it) + { + ost << getCommandString(it->_command) << "\n"; + } + CPPUNIT_ASSERT_EQUAL(std::string( + "fourth t=5 p=0\n" + "first t=10 p=0\n" + "second t=100 p=0\n" + "sixth t=400 p=0\n"), ost.str()); + CPPUNIT_ASSERT_EQUAL(3u, queue.size()); +} + +void CommandQueueTest::testReleaseLowestPriority() { + framework::defaultimplementation::FakeClock clock; + CommandQueue<api::CreateVisitorCommand> queue(clock); + CPPUNIT_ASSERT(queue.empty()); + + queue.add(getCommand("first", 1, 10)); + queue.add(getCommand("second", 10, 22)); + queue.add(getCommand("third", 5, 9)); + queue.add(getCommand("fourth", 0, 22)); + queue.add(getCommand("fifth", 3, 22)); + queue.add(getCommand("sixth", 14, 50)); + queue.add(getCommand("seventh", 7, 0)); + CPPUNIT_ASSERT_EQUAL(7u, queue.size()); + + std::vector<std::shared_ptr<api::CreateVisitorCommand> > commands; + for (;;) { + std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand()); + std::pair<std::shared_ptr<api::CreateVisitorCommand>, uint64_t> cmd( + queue.releaseLowestPriorityCommand()); + if (cmd.first.get() == 0 || cmdPeek != cmd.first) break; + commands.push_back(cmd.first); + } + CPPUNIT_ASSERT_EQUAL(size_t(7), commands.size()); + CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=50"), getCommandString(commands[0])); + CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=22"), getCommandString(commands[1])); + CPPUNIT_ASSERT_EQUAL(string("fourth t=0 p=22"), getCommandString(commands[2])); + CPPUNIT_ASSERT_EQUAL(string("second t=10 p=22"), getCommandString(commands[3])); + CPPUNIT_ASSERT_EQUAL(string("first t=1 p=10"), getCommandString(commands[4])); + CPPUNIT_ASSERT_EQUAL(string("third t=5 p=9"), getCommandString(commands[5])); + CPPUNIT_ASSERT_EQUAL(string("seventh t=7 p=0"), getCommandString(commands[6])); +} + +void CommandQueueTest::testDeleteIterator() { + framework::defaultimplementation::FakeClock clock; + CommandQueue<api::CreateVisitorCommand> queue(clock); + CPPUNIT_ASSERT(queue.empty()); + queue.add(getCommand("first", 10)); + queue.add(getCommand("second", 100)); + queue.add(getCommand("third", 1000)); + queue.add(getCommand("fourth", 5)); + queue.add(getCommand("fifth", 3000)); + queue.add(getCommand("sixth", 400)); + queue.add(getCommand("seventh", 700)); + CPPUNIT_ASSERT_EQUAL(7u, queue.size()); + + CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin(); + ++it; ++it; + queue.erase(it); + CPPUNIT_ASSERT_EQUAL(6u, queue.size()); + + std::vector<std::shared_ptr<api::CreateVisitorCommand> > cmds; + for (;;) { + std::shared_ptr<api::CreateVisitorCommand> cmd( + std::dynamic_pointer_cast<api::CreateVisitorCommand>( + queue.releaseNextCommand().first)); + if (cmd.get() == 0) break; + cmds.push_back(cmd); + } + CPPUNIT_ASSERT_EQUAL(size_t(6), cmds.size()); + CPPUNIT_ASSERT_EQUAL(string("first t=10 p=0"), getCommandString(cmds[0])); + CPPUNIT_ASSERT_EQUAL(string("second t=100 p=0"), getCommandString(cmds[1])); + CPPUNIT_ASSERT_EQUAL(string("fourth t=5 p=0"), getCommandString(cmds[2])); + CPPUNIT_ASSERT_EQUAL(string("fifth t=3000 p=0"), getCommandString(cmds[3])); + CPPUNIT_ASSERT_EQUAL(string("sixth t=400 p=0"), getCommandString(cmds[4])); + CPPUNIT_ASSERT_EQUAL(string("seventh t=700 p=0"), getCommandString(cmds[5])); +} + +} + diff --git a/storage/src/tests/visiting/memory_bounded_trace_test.cpp b/storage/src/tests/visiting/memory_bounded_trace_test.cpp new file mode 100644 index 00000000000..85eae12fc34 --- /dev/null +++ b/storage/src/tests/visiting/memory_bounded_trace_test.cpp @@ -0,0 +1,131 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/storage/visiting/memory_bounded_trace.h> + +namespace storage { + +class MemoryBoundedTraceTest : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(MemoryBoundedTraceTest); + CPPUNIT_TEST(noMemoryReportedUsedWhenEmpty); + CPPUNIT_TEST(memoryUsedIsStringLengthForLeafNode); + CPPUNIT_TEST(memoryUsedIsAccumulatedRecursivelyForNonLeafNodes); + CPPUNIT_TEST(traceNodesCanBeMovedAndImplicitlyCleared); + CPPUNIT_TEST(movedTraceTreeIsMarkedAsStrict); + CPPUNIT_TEST(canNotAddMoreNodesWhenMemoryUsedExceedsUpperBound); + CPPUNIT_TEST(movedTreeIncludesStatsNodeWhenNodesOmitted); + CPPUNIT_TEST_SUITE_END(); + +public: + void noMemoryReportedUsedWhenEmpty(); + void memoryUsedIsStringLengthForLeafNode(); + void memoryUsedIsAccumulatedRecursivelyForNonLeafNodes(); + void traceNodesCanBeMovedAndImplicitlyCleared(); + void movedTraceTreeIsMarkedAsStrict(); + void canNotAddMoreNodesWhenMemoryUsedExceedsUpperBound(); + void movedTreeIncludesStatsNodeWhenNodesOmitted(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(MemoryBoundedTraceTest); + +void +MemoryBoundedTraceTest::noMemoryReportedUsedWhenEmpty() +{ + MemoryBoundedTrace trace(100); + CPPUNIT_ASSERT_EQUAL(size_t(0), trace.getApproxMemoryUsed()); +} + +void +MemoryBoundedTraceTest::memoryUsedIsStringLengthForLeafNode() +{ + MemoryBoundedTrace trace(100); + CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0))); + CPPUNIT_ASSERT_EQUAL(size_t(11), trace.getApproxMemoryUsed()); +} + +void +MemoryBoundedTraceTest::memoryUsedIsAccumulatedRecursivelyForNonLeafNodes() +{ + MemoryBoundedTrace trace(100); + mbus::TraceNode innerNode; + innerNode.addChild("hello world"); + innerNode.addChild("goodbye moon"); + CPPUNIT_ASSERT(trace.add(innerNode)); + CPPUNIT_ASSERT_EQUAL(size_t(23), trace.getApproxMemoryUsed()); +} + +void +MemoryBoundedTraceTest::traceNodesCanBeMovedAndImplicitlyCleared() +{ + MemoryBoundedTrace trace(100); + CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0))); + mbus::TraceNode target; + trace.moveTraceTo(target); + CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren()); + CPPUNIT_ASSERT_EQUAL(size_t(0), trace.getApproxMemoryUsed()); + + mbus::TraceNode emptinessCheck; + trace.moveTraceTo(emptinessCheck); + CPPUNIT_ASSERT_EQUAL(uint32_t(0), emptinessCheck.getNumChildren()); +} + +/** + * We want trace subtrees to be strictly ordered so that the message about + * omitted traces will remain soundly as the last ordered node. There is no + * particular performance reason for not having strict mode enabled to the + * best of my knowledge, since the internal backing data structure is an + * ordered vector anyhow. + */ +void +MemoryBoundedTraceTest::movedTraceTreeIsMarkedAsStrict() +{ + MemoryBoundedTrace trace(100); + CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0))); + mbus::TraceNode target; + trace.moveTraceTo(target); + CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren()); + CPPUNIT_ASSERT(target.getChild(0).isStrict()); +} + +void +MemoryBoundedTraceTest::canNotAddMoreNodesWhenMemoryUsedExceedsUpperBound() +{ + // Note: we allow one complete node tree to exceed the bounds, but as soon + // as the bound is exceeded no further nodes can be added. + MemoryBoundedTrace trace(10); + CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0))); + CPPUNIT_ASSERT_EQUAL(size_t(11), trace.getApproxMemoryUsed()); + + CPPUNIT_ASSERT(!trace.add(mbus::TraceNode("the quick red fox runs across " + "the freeway", 0))); + CPPUNIT_ASSERT_EQUAL(size_t(11), trace.getApproxMemoryUsed()); + + mbus::TraceNode target; + trace.moveTraceTo(target); + // Twice nested node (root -> added trace tree -> leaf with txt). + CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren()); + CPPUNIT_ASSERT(target.getChild(0).getNumChildren() >= 1); + CPPUNIT_ASSERT_EQUAL(vespalib::string("hello world"), + target.getChild(0).getChild(0).getNote()); +} + +void +MemoryBoundedTraceTest::movedTreeIncludesStatsNodeWhenNodesOmitted() +{ + MemoryBoundedTrace trace(5); + CPPUNIT_ASSERT(trace.add(mbus::TraceNode("abcdef", 0))); + CPPUNIT_ASSERT(!trace.add(mbus::TraceNode("ghijkjlmn", 0))); + + mbus::TraceNode target; + trace.moveTraceTo(target); + CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren()); + CPPUNIT_ASSERT_EQUAL(uint32_t(2), target.getChild(0).getNumChildren()); + vespalib::string expected("Trace too large; omitted 1 subsequent trace " + "trees containing a total of 9 bytes"); + CPPUNIT_ASSERT_EQUAL(expected, target.getChild(0).getChild(1).getNote()); +} + +} // storage + diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp new file mode 100644 index 00000000000..d782abf7d54 --- /dev/null +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -0,0 +1,1172 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/document/datatype/datatype.h> +#include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/fieldvalue/stringfieldvalue.h> +#include <vespa/document/fieldvalue/rawfieldvalue.h> +#include <vespa/log/log.h> +#include <vespa/storageapi/message/datagram.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/visitor.h> +#include <vector> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/visiting/visitormanager.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> +#include <tests/common/dummystoragelink.h> +#include <tests/storageserver/testvisitormessagesession.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vdslib/container/visitorordering.h> +#include <vespa/documentapi/messagebus/messages/multioperationmessage.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> + + +LOG_SETUP(".visitormanagertest"); + +namespace storage { +namespace { + typedef std::vector<api::StorageMessage::SP> msg_ptr_vector; +} + +class VisitorManagerTest : public CppUnit::TestFixture +{ +private: + CPPUNIT_TEST_SUITE(VisitorManagerTest); + CPPUNIT_TEST(testNormalUsage); + CPPUNIT_TEST(testResending); + CPPUNIT_TEST(testVisitEmptyBucket); + CPPUNIT_TEST(testMultiBucketVisit); + CPPUNIT_TEST(testNoBuckets); + CPPUNIT_TEST(testVisitPutsAndRemoves); + CPPUNIT_TEST(testVisitWithTimeframeAndSelection); + CPPUNIT_TEST(testVisitWithTimeframeAndBogusSelection); + CPPUNIT_TEST(testVisitorCallbacks); + CPPUNIT_TEST(testVisitorCleanup); + CPPUNIT_TEST(testAbortOnFailedVisitorInfo); + CPPUNIT_TEST(testAbortOnFieldPathError); + CPPUNIT_TEST(testVisitorQueueTimeout); + CPPUNIT_TEST(testVisitorProcessingTimeout); + CPPUNIT_TEST(testPrioritizedVisitorQueing); + CPPUNIT_TEST(testPrioritizedMaxConcurrentVisitors); + CPPUNIT_TEST(testVisitorQueingZeroQueueSize); + CPPUNIT_TEST(testHitCounter); + CPPUNIT_TEST(testStatusPage); + CPPUNIT_TEST_SUITE_END(); + + static uint32_t docCount; + std::vector<document::Document::SP > _documents; + std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory; + std::unique_ptr<TestServiceLayerApp> _node; + std::unique_ptr<DummyStorageLink> _top; + VisitorManager* _manager; + +public: + VisitorManagerTest() : _node() {} + + // Not using setUp since can't throw exception out of it. + void initializeTest(); + void addSomeRemoves(bool removeAll = false); + void tearDown(); + TestVisitorMessageSession& getSession(uint32_t n); + uint64_t verifyCreateVisitorReply( + api::ReturnCode::Result expectedResult, + int checkStatsDocsVisited = -1, + int checkStatsBytesVisited = -1); + void getMessagesAndReply( + int expectedCount, + TestVisitorMessageSession& session, + std::vector<document::Document::SP >& docs, + std::vector<document::DocumentId>& docIds, + api::ReturnCode::Result returnCode = api::ReturnCode::OK, + documentapi::Priority::Value priority = documentapi::Priority::PRI_NORMAL_4); + uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs); + + void testNormalUsage(); + void testResending(); + void testVisitEmptyBucket(); + void testMultiBucketVisit(); + void testNoBuckets(); + void testVisitPutsAndRemoves(); + void testVisitWithTimeframeAndSelection(); + void testVisitWithTimeframeAndBogusSelection(); + void testVisitorCallbacks(); + void testVisitorCleanup(); + void testAbortOnFailedVisitorInfo(); + void testAbortOnFieldPathError(); + void testVisitorQueueTimeout(); + void testVisitorProcessingTimeout(); + void testPrioritizedVisitorQueing(); + void testPrioritizedMaxConcurrentVisitors(); + void testVisitorQueingZeroQueueSize(); + void testHitCounter(); + void testStatusPage(); +}; + +uint32_t VisitorManagerTest::docCount = 10; + +CPPUNIT_TEST_SUITE_REGISTRATION(VisitorManagerTest); + +void +VisitorManagerTest::initializeTest() +{ + LOG(debug, "Initializing test"); + vdstestlib::DirConfig config(getStandardConfig(true)); + config.getConfig("stor-visitor").set("visitorthreads", "1"); + + try { + _messageSessionFactory.reset( + new TestVisitorMessageSessionFactory(config.getConfigId())); + _node.reset( + new TestServiceLayerApp(config.getConfigId())); + _node->setupDummyPersistence(); + _node->getStateUpdater().setClusterState( + lib::ClusterState::CSP( + new lib::ClusterState("storage:1 distributor:1"))); + _top.reset(new DummyStorageLink()); + _top->push_back(std::unique_ptr<StorageLink>(_manager + = new VisitorManager( + config.getConfigId(), _node->getComponentRegister(), + *_messageSessionFactory))); + _top->push_back(std::unique_ptr<StorageLink>(new FileStorManager( + config.getConfigId(), _node->getPartitions(), _node->getPersistenceProvider(), _node->getComponentRegister()))); + _manager->setTimeBetweenTicks(10); + _top->open(); + } catch (config::InvalidConfigException& e) { + fprintf(stderr, "%s\n", e.what()); + } + // Adding some documents so database isn't empty + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::string content( + "To be, or not to be: that is the question:\n" + "Whether 'tis nobler in the mind to suffer\n" + "The slings and arrows of outrageous fortune,\n" + "Or to take arms against a sea of troubles,\n" + "And by opposing end them? To die: to sleep;\n" + "No more; and by a sleep to say we end\n" + "The heart-ache and the thousand natural shocks\n" + "That flesh is heir to, 'tis a consummation\n" + "Devoutly to be wish'd. To die, to sleep;\n" + "To sleep: perchance to dream: ay, there's the rub;\n" + "For in that sleep of death what dreams may come\n" + "When we have shuffled off this mortal coil,\n" + "Must give us pause: there's the respect\n" + "That makes calamity of so long life;\n" + "For who would bear the whips and scorns of time,\n" + "The oppressor's wrong, the proud man's contumely,\n" + "The pangs of despised love, the law's delay,\n" + "The insolence of office and the spurns\n" + "That patient merit of the unworthy takes,\n" + "When he himself might his quietus make\n" + "With a bare bodkin? who would fardels bear,\n" + "To grunt and sweat under a weary life,\n" + "But that the dread of something after death,\n" + "The undiscover'd country from whose bourn\n" + "No traveller returns, puzzles the will\n" + "And makes us rather bear those ills we have\n" + "Than fly to others that we know not of?\n" + "Thus conscience does make cowards of us all;\n" + "And thus the native hue of resolution\n" + "Is sicklied o'er with the pale cast of thought,\n" + "And enterprises of great pith and moment\n" + "With this regard their currents turn awry,\n" + "And lose the name of action. - Soft you now!\n" + "The fair Ophelia! Nymph, in thy orisons\n" + "Be all my sins remember'd.\n"); + for (uint32_t i=0; i<docCount; ++i) { + std::ostringstream uri; + uri << "userdoc:test:" << i % 10 << ":http://www.ntnu.no/" + << i << ".html"; + + _documents.push_back(document::Document::SP( + _node->getTestDocMan().createDocument(content, uri.str()))); + const document::DocumentType& type(_documents.back()->getType()); + _documents.back()->setValue(type.getField("headerval"), + document::IntFieldValue(i % 4)); + } + for (uint32_t i=0; i<10; ++i) { + document::BucketId bid(16, i); + + std::shared_ptr<api::CreateBucketCommand> cmd( + new api::CreateBucketCommand(bid)); + cmd->setAddress(address); + cmd->setSourceIndex(0); + _top->sendDown(cmd); + _top->waitForMessages(1, 60); + _top->reset(); + + StorBucketDatabase::WrappedEntry entry( + _node->getStorageBucketDatabase().get(bid, "", + StorBucketDatabase::CREATE_IF_NONEXISTING)); + entry->disk = 0; + entry.write(); + } + for (uint32_t i=0; i<docCount; ++i) { + document::BucketId bid(16, i); + + std::shared_ptr<api::PutCommand> cmd( + new api::PutCommand(bid, _documents[i], i+1)); + cmd->setAddress(address); + _top->sendDown(cmd); + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); + std::shared_ptr<api::PutReply> reply( + std::dynamic_pointer_cast<api::PutReply>( + replies[0])); + CPPUNIT_ASSERT(reply.get()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), + reply->getResult()); + } + LOG(debug, "Done initializing test"); +} + +void +VisitorManagerTest::addSomeRemoves(bool removeAll) +{ + framework::defaultimplementation::FakeClock clock; + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + for (uint32_t i=0; i<docCount; i += (removeAll ? 1 : 4)) { + // Add it to the database + document::BucketId bid(16, i % 10); + std::shared_ptr<api::RemoveCommand> cmd( + new api::RemoveCommand( + bid, _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1)); + cmd->setAddress(address); + _top->sendDown(cmd); + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); + std::shared_ptr<api::RemoveReply> reply( + std::dynamic_pointer_cast<api::RemoveReply>( + replies[0])); + CPPUNIT_ASSERT(reply.get()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), + reply->getResult()); + } +} + +void +VisitorManagerTest::tearDown() +{ + if (_top.get() != 0) { + _top->close(); + _top->flush(); + _top.reset(0); + } + _node.reset(0); + _messageSessionFactory.reset(0); + _manager = 0; +} + +TestVisitorMessageSession& +VisitorManagerTest::getSession(uint32_t n) +{ + // Wait until we have started the visitor + const std::vector<TestVisitorMessageSession*>& sessions( + _messageSessionFactory->_visitorSessions); + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + while (true) { + { + vespalib::LockGuard lock(_messageSessionFactory->_accessLock); + if (sessions.size() > n) { + return *sessions[n]; + } + } + if (clock.getTimeInMillis() > endTime) { + throw vespalib::IllegalStateException( + "Timed out waiting for visitor session", VESPA_STRLOC); + } + FastOS_Thread::Sleep(10); + } + throw std::logic_error("unreachable"); +} + +void +VisitorManagerTest::getMessagesAndReply( + int expectedCount, + TestVisitorMessageSession& session, + std::vector<document::Document::SP >& docs, + std::vector<document::DocumentId>& docIds, + api::ReturnCode::Result result, + documentapi::Priority::Value priority) +{ + for (int i = 0; i < expectedCount; i++) { + session.waitForMessages(i + 1); + mbus::Reply::UP reply; + { + vespalib::MonitorGuard guard(session.getMonitor()); + + CPPUNIT_ASSERT_EQUAL(priority, + session.sentMessages[i]->getPriority()); + + switch (session.sentMessages[i]->getType()) { + case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: + docs.push_back(static_cast<documentapi::PutDocumentMessage&>( + *session.sentMessages[i]).getDocument()); + break; + case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>( + *session.sentMessages[i]).getDocumentId()); + break; + default: + break; + } + + reply = session.sentMessages[i]->createReply(); + reply->swapState(*session.sentMessages[i]); + reply->setMessage( + mbus::Message::UP(session.sentMessages[i].release())); + + if (result != api::ReturnCode::OK) { + reply->addError(mbus::Error(result, "Generic error")); + } + } + + session.reply(std::move(reply)); + } +} + +uint64_t +VisitorManagerTest::verifyCreateVisitorReply( + api::ReturnCode::Result expectedResult, + int checkStatsDocsVisited, + int checkStatsBytesVisited) +{ + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL(1, (int)replies.size()); + + std::shared_ptr<api::StorageMessage> msg(replies[0]); + + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT(reply.get()); + CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult()); + + if (checkStatsDocsVisited >= 0) { + CPPUNIT_ASSERT_EQUAL(checkStatsDocsVisited, + int(reply->getVisitorStatistics().getDocumentsVisited())); + } + if (checkStatsBytesVisited >= 0) { + CPPUNIT_ASSERT_EQUAL(checkStatsBytesVisited, + int(reply->getVisitorStatistics().getBytesVisited())); + } + + return reply->getMsgId(); +} + +uint32_t +VisitorManagerTest::getMatchingDocuments(std::vector<document::Document::SP >& docs) { + uint32_t equalCount = 0; + for (uint32_t i=0; i<docs.size(); ++i) { + for (uint32_t j=0; j<_documents.size(); ++j) { + if (docs[i]->getId() == _documents[j]->getId() + && *docs[i] == *_documents[j]) + + { + equalCount++; + } + } + } + + return equalCount; +} + +void +VisitorManagerTest::testHitCounter() +{ + document::OrderingSpecification spec(document::OrderingSpecification::ASCENDING, 42, 7, 2); + Visitor::HitCounter hitCounter(&spec); + + hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:42:foo"), 450); + hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:49:foo"), 450); + hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:60:foo"), 450); + hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:10:foo"), 450); + hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:21:foo"), 450); + + CPPUNIT_ASSERT_EQUAL(3, (int)hitCounter.getFirstPassHits()); + CPPUNIT_ASSERT_EQUAL(1350, (int)hitCounter.getFirstPassBytes()); + CPPUNIT_ASSERT_EQUAL(2, (int)hitCounter.getSecondPassHits()); + CPPUNIT_ASSERT_EQUAL(900, (int)hitCounter.getSecondPassBytes()); +} + +namespace { + +int getTotalSerializedSize(const std::vector<document::Document::SP>& docs) +{ + int total = 0; + for (size_t i = 0; i < docs.size(); ++i) { + total += int(docs[i]->serialize()->getLength()); + } + return total; +} + +} + +void +VisitorManagerTest::testNormalUsage() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setControlDestination("foo/bar"); + _top->sendDown(cmd); + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + // Should receive one multioperation message (bucket 3 has one document). + getMessagesAndReply(1, getSession(0), docs, docIds); + + // All data has been replied to, expecting to get a create visitor reply + verifyCreateVisitorReply(api::ReturnCode::OK, + int(docs.size()), + getTotalSerializedSize(docs)); + + CPPUNIT_ASSERT_EQUAL(1u, getMatchingDocuments(docs)); + CPPUNIT_ASSERT(!_manager->hasPendingMessageState()); +} + +void +VisitorManagerTest::testResending() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setControlDestination("foo/bar"); + _top->sendDown(cmd); + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + TestVisitorMessageSession& session = getSession(0); + getMessagesAndReply(1, session, docs, docIds, api::ReturnCode::NOT_READY); + + { + session.waitForMessages(2); + + documentapi::DocumentMessage* msg = session.sentMessages[1].get(); + + mbus::Reply::UP reply = msg->createReply(); + + CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO, + session.sentMessages[1]->getType()); + reply->swapState(*session.sentMessages[1]); + reply->setMessage(mbus::Message::UP(session.sentMessages[1].release())); + session.reply(std::move(reply)); + } + + _node->getClock().addSecondsToTime(1); + + { + session.waitForMessages(3); + + documentapi::DocumentMessage* msg = session.sentMessages[2].get(); + + mbus::Reply::UP reply = msg->createReply(); + + reply->swapState(*session.sentMessages[2]); + reply->setMessage(mbus::Message::UP(session.sentMessages[2].release())); + session.reply(std::move(reply)); + } + + // All data has been replied to, expecting to get a create visitor reply + verifyCreateVisitorReply(api::ReturnCode::OK); +} + +void +VisitorManagerTest::testVisitEmptyBucket() +{ + initializeTest(); + addSomeRemoves(true); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + + cmd->setAddress(address); + _top->sendDown(cmd); + + // All data has been replied to, expecting to get a create visitor reply + verifyCreateVisitorReply(api::ReturnCode::OK); +} + +void +VisitorManagerTest::testMultiBucketVisit() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + for (uint32_t i=0; i<10; ++i) { + cmd->addBucketToBeVisited(document::BucketId(16, i)); + } + cmd->setAddress(address); + cmd->setDataDestination("fooclient.0"); + _top->sendDown(cmd); + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + // Should receive one multioperation message for each bucket + getMessagesAndReply(10, getSession(0), docs, docIds); + + // All data has been replied to, expecting to get a create visitor reply + verifyCreateVisitorReply(api::ReturnCode::OK); + + CPPUNIT_ASSERT_EQUAL(docCount, getMatchingDocuments(docs)); +} + +void +VisitorManagerTest::testNoBuckets() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + + cmd->setAddress(address); + _top->sendDown(cmd); + + // Should get one reply; a CreateVisitorReply with error since no + // buckets where specified in the CreateVisitorCommand + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>( + replies[0])); + // Verify that cast went ok => it was a CreateVisitorReply message + CPPUNIT_ASSERT(reply.get()); + api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS, + "No buckets specified"); + CPPUNIT_ASSERT_EQUAL(ret, reply->getResult()); +} + +void VisitorManagerTest::testVisitPutsAndRemoves() +{ + initializeTest(); + addSomeRemoves(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->setAddress(address); + cmd->setVisitRemoves(); + for (uint32_t i=0; i<10; ++i) { + cmd->addBucketToBeVisited(document::BucketId(16, i)); + } + _top->sendDown(cmd); + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + getMessagesAndReply(10, getSession(0), docs, docIds); + + verifyCreateVisitorReply(api::ReturnCode::OK); + + CPPUNIT_ASSERT_EQUAL( + docCount - (docCount + 3) / 4, + getMatchingDocuments(docs)); + + CPPUNIT_ASSERT_EQUAL( + (size_t) (docCount + 3) / 4, + docIds.size()); +} + +void VisitorManagerTest::testVisitWithTimeframeAndSelection() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", + "testdoctype1.headerval < 2")); + cmd->setFromTime(3); + cmd->setToTime(8); + for (uint32_t i=0; i<10; ++i) { + cmd->addBucketToBeVisited(document::BucketId(16, i)); + } + cmd->setAddress(address); + _top->sendDown(cmd); + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + getMessagesAndReply(2, getSession(0), docs, docIds); + + verifyCreateVisitorReply(api::ReturnCode::OK); + + CPPUNIT_ASSERT_EQUAL((size_t) 2, docs.size()); + std::set<std::string> expected; + expected.insert("userdoc:test:4:http://www.ntnu.no/4.html"); + expected.insert("userdoc:test:5:http://www.ntnu.no/5.html"); + std::set<std::string> actual; + for (uint32_t i=0; i<docs.size(); ++i) { + actual.insert(docs[i]->getId().toString()); + } + CPPUNIT_ASSERT_EQUAL(expected, actual); +} + +void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", + "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2")); + cmd->setFromTime(3); + cmd->setToTime(8); + for (uint32_t i=0; i<10; ++i) { + cmd->addBucketToBeVisited(document::BucketId(16, i)); + } + cmd->setAddress(address); + + _top->sendDown(cmd); + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size()); + + api::StorageReply* reply = dynamic_cast<api::StorageReply*>( + replies.front().get()); + CPPUNIT_ASSERT(reply); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS, + reply->getResult().getResult()); +} + +void +VisitorManagerTest::testVisitorCallbacks() +{ + initializeTest(); + std::ostringstream replydata; + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("TestVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->addBucketToBeVisited(document::BucketId(16, 5)); + cmd->setAddress(address); + _top->sendDown(cmd); + + // Wait until we have started the visitor + TestVisitorMessageSession& session = getSession(0); + + for (uint32_t i = 0; i < 6; i++) { + session.waitForMessages(i + 1); + mbus::Reply::UP reply; + { + vespalib::MonitorGuard guard(session.getMonitor()); + + CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_MAPVISITOR, session.sentMessages[i]->getType()); + + documentapi::MapVisitorMessage* mapvisitormsg( + static_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get())); + + replydata << mapvisitormsg->getData().get("msg"); + + reply = mapvisitormsg->createReply(); + reply->swapState(*session.sentMessages[i]); + reply->setMessage(mbus::Message::UP(session.sentMessages[i].release())); + } + session.reply(std::move(reply)); + } + + // All data has been replied to, expecting to get a create visitor reply + verifyCreateVisitorReply(api::ReturnCode::OK); + + CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 1, "Starting visitor"); + CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 2, "Handling block of 1 documents"); + CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 2, "completedBucket"); + CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 1, "completedVisiting"); +} + +void +VisitorManagerTest::testVisitorCleanup() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + + // Start a bunch of invalid visitors + for (uint32_t i=0; i<10; ++i) { + std::ostringstream ost; + ost << "testvis" << i; + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("InvalidVisitor", ost.str(), "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(0); + _top->sendDown(cmd); + _top->waitForMessages(i+1, 60); + } + + // Start a bunch of visitors + for (uint32_t i=0; i<10; ++i) { + std::ostringstream ost; + ost << "testvis" << (i + 10); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", ost.str(), "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(0); + _top->sendDown(cmd); + } + + + // Should get 14 immediate replies - 10 failures and 4 busy + { + _top->waitForMessages(14, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + + int failures = 0; + int busy = 0; + + for (uint32_t i=0; i< 14; ++i) { + std::shared_ptr<api::StorageMessage> msg(replies[i]); + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT(reply.get()); + + if (i < 10) { + if (api::ReturnCode::ILLEGAL_PARAMETERS == reply->getResult().getResult()) { + failures++; + } else { + std::cerr << reply->getResult() << "\n"; + } + } else { + if (api::ReturnCode::BUSY == reply->getResult().getResult()) { + busy++; + } + } + } + + CPPUNIT_ASSERT_EQUAL(10, failures); + CPPUNIT_ASSERT_EQUAL(4, busy); + } + + // Finish a visitor + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + getMessagesAndReply(1, getSession(0), docs, docIds); + + // Should get a reply for the visitor. + verifyCreateVisitorReply(api::ReturnCode::OK); + + // Fail a visitor + getMessagesAndReply(1, getSession(1), docs, docIds, api::ReturnCode::INTERNAL_FAILURE); + + // Should get a reply for the visitor. + verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE); + + while (_manager->getActiveVisitorCount() > 2) { + FastOS_Thread::Sleep(10); + } + + // Start a bunch of more visitors + for (uint32_t i=0; i<10; ++i) { + std::ostringstream ost; + ost << "testvis" << (i + 24); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", ost.str(), "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(0); + _top->sendDown(cmd); + } + + // Should now get 8 busy. + _top->waitForMessages(8, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL(8, (int)replies.size()); + + for (uint32_t i=0; i< replies.size(); ++i) { + std::shared_ptr<api::StorageMessage> msg(replies[i]); + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT(reply.get()); + + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::BUSY, reply->getResult().getResult()); + } +} + +void +VisitorManagerTest::testAbortOnFailedVisitorInfo() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + + { + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(0); + _top->sendDown(cmd); + } + + uint32_t visitorRepliesReceived = 0; + uint32_t oki = 0; + uint32_t failed = 0; + + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + TestVisitorMessageSession& session = getSession(0); + getMessagesAndReply(1, session, docs, docIds, api::ReturnCode::NOT_READY); + + { + session.waitForMessages(2); + + documentapi::DocumentMessage* cmd = session.sentMessages[1].get(); + + mbus::Reply::UP reply = cmd->createReply(); + + CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO, session.sentMessages[1]->getType()); + reply->swapState(*session.sentMessages[1]); + reply->setMessage(mbus::Message::UP(session.sentMessages[1].release())); + reply->addError(mbus::Error(api::ReturnCode::NOT_CONNECTED, "Me no ready")); + session.reply(std::move(reply)); + } + + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + for (uint32_t i=0; i< replies.size(); ++i) { + std::shared_ptr<api::StorageMessage> msg(replies[i]); + if (msg->getType() == api::MessageType::VISITOR_CREATE_REPLY) + { + ++visitorRepliesReceived; + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT(reply.get()); + if (reply->getResult().success()) { + ++oki; + std::cerr << "\n" << reply->toString(true) << "\n"; + } else { + ++failed; + } + } + } + + std::ostringstream errmsg; + errmsg << "oki " << oki << ", failed " << failed; + + CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 0u, oki); + CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 1u, failed); +} + +void +VisitorManagerTest::testAbortOnFieldPathError() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + + // Use bogus field path to force error to happen + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", + "testvis", + "testdoctype1.headerval{bogus} == 1234")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(0); + _top->sendDown(cmd); + + verifyCreateVisitorReply(api::ReturnCode::ILLEGAL_PARAMETERS); +} + +void +VisitorManagerTest::testVisitorQueueTimeout() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + _manager->enforceQueueUsage(); + + { + vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor()); + + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(1); + cmd->setTimeout(100 * 1000 * 1000); + _top->sendDown(cmd); + + _node->getClock().addSecondsToTime(1000); + } + + // Don't answer any messages. Make sure we timeout anyways. + uint32_t visitorRepliesReceived = 0; + + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + std::shared_ptr<api::StorageMessage> msg(replies[0]); + + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + ++visitorRepliesReceived; + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY, + "Visitor timed out in visitor queue"), + reply->getResult()); +} + +void +VisitorManagerTest::testVisitorProcessingTimeout() +{ + initializeTest(); + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(0); + cmd->setTimeout(100); + _top->sendDown(cmd); + + // Wait for Put before increasing the clock + TestVisitorMessageSession& session = getSession(0); + session.waitForMessages(1); + + _node->getClock().addSecondsToTime(1000); + + // Don't answer any messages. Make sure we timeout anyways. + uint32_t visitorRepliesReceived = 0; + + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + std::shared_ptr<api::StorageMessage> msg(replies[0]); + + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + ++visitorRepliesReceived; + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + reply->getResult().getResult()); +} + +namespace { + uint32_t nextVisitor = 0; + + api::StorageMessage::Id + sendCreateVisitor(uint32_t timeout, DummyStorageLink& top, uint8_t priority = 127) { + std::ostringstream ost; + ost << "testvis" << ++nextVisitor; + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand("DumpVisitor", ost.str(), "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setQueueTimeout(timeout); + cmd->setPriority(priority); + top.sendDown(cmd); + return cmd->getMsgId(); + } +} + +void +VisitorManagerTest::testPrioritizedVisitorQueing() +{ + framework::HttpUrlPath path("?verbose=true&allvisitors=true"); + initializeTest(); + + _manager->setMaxConcurrentVisitors(4); + _manager->setMaxVisitorQueueSize(4); + + api::StorageMessage::Id ids[10] = { 0 }; + + // First 4 should just start.. + for (uint32_t i = 0; i < 4; ++i) { + ids[i] = sendCreateVisitor(i, *_top, i); + } + + // Next ones should be queued - (Better not finish before we get here) + // Submit with higher priorities + for (uint32_t i = 0; i < 4; ++i) { + ids[i + 4] = sendCreateVisitor(1000, *_top, 100 - i); + } + + // Queue is now full with a pri 100 visitor at its end + // Send a lower pri visitor that will be busy-returned immediately + ids[8] = sendCreateVisitor(1000, *_top, 130); + + CPPUNIT_ASSERT_EQUAL(ids[8], verifyCreateVisitorReply(api::ReturnCode::BUSY)); + + // Send a higher pri visitor that will take the place of pri 100 visitor + ids[9] = sendCreateVisitor(1000, *_top, 60); + + CPPUNIT_ASSERT_EQUAL(ids[4], verifyCreateVisitorReply(api::ReturnCode::BUSY)); + + // Finish the first visitor + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK, + documentapi::Priority::PRI_HIGHEST); + verifyCreateVisitorReply(api::ReturnCode::OK); + + // We should now start the highest priority visitor. + getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK, + documentapi::Priority::PRI_VERY_HIGH); + CPPUNIT_ASSERT_EQUAL(ids[9], verifyCreateVisitorReply(api::ReturnCode::OK)); +} + +void +VisitorManagerTest::testPrioritizedMaxConcurrentVisitors() { + framework::HttpUrlPath path("?verbose=true&allvisitors=true"); + initializeTest(); + + api::StorageMessage::Id ids[17] = { 0 }; + + // Number of concurrent visitors is in [4, 8], depending on priority + // Max concurrent: + // [0, 1): 4 + // [1, 64): 3 + // [64, 128): 2 + // [128, 192): 1 + // [192, 256): 0 + _manager->setMaxConcurrentVisitors(4, 4); + _manager->setMaxVisitorQueueSize(6); + + // First 4 should just start.. + for (uint32_t i = 0; i < 4; ++i) { + ids[i] = sendCreateVisitor(i, *_top, i); + } + + // Low pri messages; get put into queue + for (uint32_t i = 0; i < 6; ++i) { + ids[i + 4] = sendCreateVisitor(1000, *_top, 203 - i); + } + + // Higher pri message: fits happily into 1 extra concurrent slot + ids[10] = sendCreateVisitor(1000, *_top, 190); + + // Should punch pri203 msg out of the queue -> busy + ids[11] = sendCreateVisitor(1000, *_top, 197); + + CPPUNIT_ASSERT_EQUAL(ids[4], verifyCreateVisitorReply(api::ReturnCode::BUSY)); + + // No concurrency slots left for this message -> busy + ids[12] = sendCreateVisitor(1000, *_top, 204); + + CPPUNIT_ASSERT_EQUAL(ids[12], verifyCreateVisitorReply(api::ReturnCode::BUSY)); + + // Gets a concurrent slot + ids[13] = sendCreateVisitor(1000, *_top, 80); + + // Kicks pri 202 out of the queue -> busy + ids[14] = sendCreateVisitor(1000, *_top, 79); + + CPPUNIT_ASSERT_EQUAL(ids[5], verifyCreateVisitorReply(api::ReturnCode::BUSY)); + + // Gets a concurrent slot + ids[15] = sendCreateVisitor(1000, *_top, 63); + + // Very Important Visitor(tm) gets a concurrent slot + ids[16] = sendCreateVisitor(1000, *_top, 0); + + std::vector<document::Document::SP > docs; + std::vector<document::DocumentId> docIds; + + std::set<uint64_t> finishedVisitors; + + // Verify that the correct visitors are running. + for (int i = 0; i < 8; i++) { + documentapi::Priority::Value priority = + documentapi::Priority::PRI_HIGHEST; // ids 0-3,16 + if (i == 4) { + priority = documentapi::Priority::PRI_VERY_LOW; // ids 10 + } else if (i == 5) { + priority = documentapi::Priority::PRI_HIGH_2; // ids 13 + } else if (i == 6) { + priority = documentapi::Priority::PRI_HIGH_1; // ids 15 + } + getMessagesAndReply(1, getSession(i), docs, docIds, api::ReturnCode::OK, + priority); + finishedVisitors.insert(verifyCreateVisitorReply(api::ReturnCode::OK)); + } + + for (int i = 0; i < 4; i++) { + CPPUNIT_ASSERT(finishedVisitors.find(ids[i]) != finishedVisitors.end()); + } + + CPPUNIT_ASSERT(finishedVisitors.find(ids[10]) != finishedVisitors.end()); + CPPUNIT_ASSERT(finishedVisitors.find(ids[13]) != finishedVisitors.end()); + CPPUNIT_ASSERT(finishedVisitors.find(ids[15]) != finishedVisitors.end()); + CPPUNIT_ASSERT(finishedVisitors.find(ids[16]) != finishedVisitors.end()); + + finishedVisitors.clear(); + + for (int i = 8; i < 14; i++) { + documentapi::Priority::Value priority = + documentapi::Priority::PRI_LOWEST; // ids 6-9,11 + if (i == 8) { + priority = documentapi::Priority::PRI_HIGH_2; // ids 14 + } + getMessagesAndReply(1, getSession(i), docs, docIds, api::ReturnCode::OK, + priority); + uint64_t msgId = verifyCreateVisitorReply(api::ReturnCode::OK); + finishedVisitors.insert(msgId); + } + + for (int i = 6; i < 10; i++) { + CPPUNIT_ASSERT(finishedVisitors.find(ids[i]) != finishedVisitors.end()); + } + + CPPUNIT_ASSERT(finishedVisitors.find(ids[11]) != finishedVisitors.end()); + CPPUNIT_ASSERT(finishedVisitors.find(ids[14]) != finishedVisitors.end()); +} + +void +VisitorManagerTest::testVisitorQueingZeroQueueSize() { + framework::HttpUrlPath path("?verbose=true&allvisitors=true"); + initializeTest(); + + _manager->setMaxConcurrentVisitors(4); + _manager->setMaxVisitorQueueSize(0); + + // First 4 should just start.. + for (uint32_t i = 0; i < 4; ++i) { + sendCreateVisitor(i, *_top, i); + } + // Queue size is zero, all visitors will be busy-returned + for (uint32_t i = 0; i < 5; ++i) { + sendCreateVisitor(1000, *_top, 100 - i); + verifyCreateVisitorReply(api::ReturnCode::BUSY); + } +} + +void +VisitorManagerTest::testStatusPage() { + framework::HttpUrlPath path("?verbose=true&allvisitors=true"); + initializeTest(); + + _manager->setMaxConcurrentVisitors(1, 1); + _manager->setMaxVisitorQueueSize(6); + // 1 running, 1 queued + sendCreateVisitor(1000000, *_top, 1); + sendCreateVisitor(1000000, *_top, 128); + + TestVisitorMessageSession& session = getSession(0); + session.waitForMessages(1); + + std::ostringstream ss; + static_cast<framework::HtmlStatusReporter&>(*_manager).reportHtmlStatus(ss, path); + + std::string str(ss.str()); + CPPUNIT_ASSERT(str.find("Currently running visitors") != std::string::npos); + // Should be propagated to visitor thread + CPPUNIT_ASSERT(str.find("Running 1 visitors") != std::string::npos); // 1 active + CPPUNIT_ASSERT(str.find("waiting visitors 1") != std::string::npos); // 1 queued + CPPUNIT_ASSERT(str.find("Visitor thread 0") != std::string::npos); + CPPUNIT_ASSERT(str.find("Disconnected visitor timeout") != std::string::npos); // verbose per thread + CPPUNIT_ASSERT(str.find("Message #1 <b>putdocumentmessage</b>") != std::string::npos); // 1 active +} + +} diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp new file mode 100644 index 00000000000..aed08a676b8 --- /dev/null +++ b/storage/src/tests/visiting/visitortest.cpp @@ -0,0 +1,1023 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/document/datatype/datatype.h> +#include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/fieldvalue/stringfieldvalue.h> +#include <vespa/document/fieldvalue/rawfieldvalue.h> +#include <vespa/log/log.h> +#include <vespa/storageapi/message/datagram.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/visitor.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/visiting/visitormanager.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/dummystoragelink.h> +#include <tests/storageserver/testvisitormessagesession.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vdslib/container/visitorordering.h> +#include <vespa/documentapi/messagebus/messages/multioperationmessage.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vector> +#include <thread> +#include <chrono> + +LOG_SETUP(".visitortest"); + +using namespace std::chrono_literals; + +namespace storage { + +namespace { + +using msg_ptr_vector = std::vector<api::StorageMessage::SP>; + +struct TestParams +{ + TestParams& iteratorsPerBucket(uint32_t n) { + _iteratorsPerBucket = n; + return *this; + } + TestParams& maxVisitorMemoryUsage(uint32_t bytes) { + _maxVisitorMemoryUsage = bytes; + return *this; + } + TestParams& parallelBuckets(uint32_t n) { + _parallelBuckets = n; + return *this; + } + TestParams& autoReplyError(const mbus::Error& error) { + _autoReplyError = error; + return *this; + } + + uint32_t _iteratorsPerBucket {1}; + uint32_t _maxVisitorMemoryUsage {UINT32_MAX}; + uint32_t _parallelBuckets {1}; + mbus::Error _autoReplyError; +}; + +} + +class VisitorTest : public CppUnit::TestFixture +{ +private: + CPPUNIT_TEST_SUITE(VisitorTest); + CPPUNIT_TEST(testNormalUsage); + CPPUNIT_TEST(testFailedCreateIterator); + CPPUNIT_TEST(testFailedGetIter); + CPPUNIT_TEST(testMultipleFailedGetIter); + CPPUNIT_TEST(testDocumentAPIClientError); + CPPUNIT_TEST(testNoDocumentAPIResendingForFailedVisitor); + CPPUNIT_TEST(testIteratorCreatedForFailedVisitor); + CPPUNIT_TEST(testFailedDocumentAPISend); + CPPUNIT_TEST(testNoVisitorNotificationForTransientFailures); + CPPUNIT_TEST(testNotificationSentIfTransientErrorRetriedManyTimes); + CPPUNIT_TEST(testNoMbusTracingIfTraceLevelIsZero); + CPPUNIT_TEST(testReplyContainsTraceIfTraceLevelAboveZero); + CPPUNIT_TEST(testNoMoreIteratorsSentWhileMemoryUsedAboveLimit); + CPPUNIT_TEST(testDumpVisitorInvokesStrongReadConsistencyIteration); + CPPUNIT_TEST(testTestVisitorInvokesWeakReadConsistencyIteration); + CPPUNIT_TEST_SUITE_END(); + + static uint32_t docCount; + std::vector<document::Document::SP > _documents; + std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory; + std::unique_ptr<TestServiceLayerApp> _node; + std::unique_ptr<DummyStorageLink> _top; + DummyStorageLink* _bottom; + VisitorManager* _manager; + +public: + VisitorTest() : _node() {} + + void testNormalUsage(); + void testFailedCreateIterator(); + void testFailedGetIter(); + void testMultipleFailedGetIter(); + void testDocumentAPIClientError(); + void testNoDocumentAPIResendingForFailedVisitor(); + void testIteratorCreatedForFailedVisitor(); + void testFailedDocumentAPISend(); + void testNoVisitorNotificationForTransientFailures(); + void testNotificationSentIfTransientErrorRetriedManyTimes(); + void testNoMbusTracingIfTraceLevelIsZero(); + void testReplyContainsTraceIfTraceLevelAboveZero(); + void testNoMoreIteratorsSentWhileMemoryUsedAboveLimit(); + void testDumpVisitorInvokesStrongReadConsistencyIteration(); + void testTestVisitorInvokesWeakReadConsistencyIteration(); + // TODO: + void testVisitMultipleBuckets() {} + + // Not using setUp since can't throw exception out of it. + void initializeTest(const TestParams& params = TestParams()); + + struct VisitorOptions { + std::string visitorType{"dumpvisitor"}; + + VisitorOptions() {} + + VisitorOptions& withVisitorType(vespalib::stringref type) { + visitorType = type; + return *this; + } + }; + + std::shared_ptr<api::CreateVisitorCommand> makeCreateVisitor( + const VisitorOptions& options = VisitorOptions()); + void tearDown(); + bool waitUntilNoActiveVisitors(); + TestVisitorMessageSession& getSession(uint32_t n); + uint64_t verifyCreateVisitorReply( + api::ReturnCode::Result expectedResult, + int checkStatsDocsVisited = -1, + int checkStatsBytesVisited = -1); + void getMessagesAndReply( + int expectedCount, + TestVisitorMessageSession& session, + std::vector<document::Document::SP >& docs, + std::vector<document::DocumentId>& docIds, + std::vector<std::string>& infoMessages, + api::ReturnCode::Result returnCode = api::ReturnCode::OK); + uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs); + +private: + void doTestVisitorInstanceHasConsistencyLevel( + vespalib::stringref visitorType, + spi::ReadConsistency expectedConsistency); + + template <typename T> + std::vector<std::shared_ptr<T> > + fetchMultipleCommands(DummyStorageLink& link, size_t count); + + template <typename T> + std::shared_ptr<T> + fetchSingleCommand(DummyStorageLink& link); + + void sendGetIterReply(GetIterCommand& cmd, + const api::ReturnCode& result = + api::ReturnCode(api::ReturnCode::OK), + uint32_t maxDocuments = 0, + bool overrideCompleted = false); + void sendCreateIteratorReply(uint64_t iteratorId = 1234); + std::shared_ptr<api::CreateVisitorReply> doCompleteVisitingSession( + const std::shared_ptr<api::CreateVisitorCommand>& cmd); + + void sendInitialCreateVisitorAndGetIterRound(); + + int64_t getFailedVisitorDestinationReplyCount() const { + // There's no metric manager attached to these tests, so even if the + // test should magically freeze here for 5+ minutes, nothing should + // come in and wipe our accumulated failure metrics. + // Only 1 visitor thread running, so we know it has the metrics. + const auto& metrics = _manager->getThread(0).getMetrics(); + auto loadType = documentapi::LoadType::DEFAULT; + return metrics.visitorDestinationFailureReplies[loadType].getCount(); + } +}; + +uint32_t VisitorTest::docCount = 10; + +CPPUNIT_TEST_SUITE_REGISTRATION(VisitorTest); + +void +VisitorTest::initializeTest(const TestParams& params) +{ + LOG(debug, "Initializing test"); + vdstestlib::DirConfig config(getStandardConfig(true)); + config.getConfig("stor-visitor").set("visitorthreads", "1"); + config.getConfig("stor-visitor").set( + "iterators_per_bucket", + std::to_string(params._iteratorsPerBucket)); + config.getConfig("stor-visitor").set( + "defaultparalleliterators", + std::to_string(params._parallelBuckets)); + config.getConfig("stor-visitor").set( + "visitor_memory_usage_limit", + std::to_string(params._maxVisitorMemoryUsage)); + + system("chmod 755 vdsroot 2>/dev/null"); + system("rm -rf vdsroot* 2>/dev/null"); + assert(system("mkdir -p vdsroot/disks/d0") == 0); + assert(system("mkdir -p vdsroot/disks/d1") == 0); + + try { + _messageSessionFactory.reset( + new TestVisitorMessageSessionFactory(config.getConfigId())); + if (params._autoReplyError.getCode() != mbus::ErrorCode::NONE) { + _messageSessionFactory->_autoReplyError = params._autoReplyError; + _messageSessionFactory->_createAutoReplyVisitorSessions = true; + } + _node.reset(new TestServiceLayerApp(config.getConfigId())); + _top.reset(new DummyStorageLink()); + _top->push_back(std::unique_ptr<StorageLink>(_manager + = new VisitorManager( + config.getConfigId(), + _node->getComponentRegister(), *_messageSessionFactory))); + _bottom = new DummyStorageLink(); + _top->push_back(std::unique_ptr<StorageLink>(_bottom)); + _manager->setTimeBetweenTicks(10); + _top->open(); + } catch (config::InvalidConfigException& e) { + fprintf(stderr, "%s\n", e.what()); + } + std::string content( + "To be, or not to be: that is the question:\n" + "Whether 'tis nobler in the mind to suffer\n" + "The slings and arrows of outrageous fortune,\n" + "Or to take arms against a sea of troubles,\n" + "And by opposing end them? To die: to sleep;\n" + "No more; and by a sleep to say we end\n" + "The heart-ache and the thousand natural shocks\n" + "That flesh is heir to, 'tis a consummation\n" + "Devoutly to be wish'd. To die, to sleep;\n" + "To sleep: perchance to dream: ay, there's the rub;\n" + "For in that sleep of death what dreams may come\n" + "When we have shuffled off this mortal coil,\n" + "Must give us pause: there's the respect\n" + "That makes calamity of so long life;\n" + "For who would bear the whips and scorns of time,\n" + "The oppressor's wrong, the proud man's contumely,\n" + "The pangs of despised love, the law's delay,\n" + "The insolence of office and the spurns\n" + "That patient merit of the unworthy takes,\n" + "When he himself might his quietus make\n" + "With a bare bodkin? who would fardels bear,\n" + "To grunt and sweat under a weary life,\n" + "But that the dread of something after death,\n" + "The undiscover'd country from whose bourn\n" + "No traveller returns, puzzles the will\n" + "And makes us rather bear those ills we have\n" + "Than fly to others that we know not of?\n" + "Thus conscience does make cowards of us all;\n" + "And thus the native hue of resolution\n" + "Is sicklied o'er with the pale cast of thought,\n" + "And enterprises of great pith and moment\n" + "With this regard their currents turn awry,\n" + "And lose the name of action. - Soft you now!\n" + "The fair Ophelia! Nymph, in thy orisons\n" + "Be all my sins remember'd.\n"); + _documents.clear(); + for (uint32_t i=0; i<docCount; ++i) { + std::ostringstream uri; + uri << "userdoc:test:" << i % 10 << ":http://www.ntnu.no/" + << i << ".html"; + + _documents.push_back(document::Document::SP( + _node->getTestDocMan().createDocument(content, uri.str()))); + const document::DocumentType& type(_documents.back()->getType()); + _documents.back()->setValue(type.getField("headerval"), + document::IntFieldValue(i % 4)); + } + LOG(debug, "Done initializing test"); +} + +void +VisitorTest::tearDown() +{ + if (_top.get() != 0) { + _top->close(); + _top->flush(); + _top.reset(0); + } + _node.reset(0); + _messageSessionFactory.reset(0); + _manager = 0; +} + +bool +VisitorTest::waitUntilNoActiveVisitors() +{ + int i = 0; + for (; i < 1000; ++i) { + if (_manager->getActiveVisitorCount() == 0) { + return true; + } + std::this_thread::sleep_for(10ms); + } + return false; +} + +TestVisitorMessageSession& +VisitorTest::getSession(uint32_t n) +{ + // Wait until we have started the visitor + const std::vector<TestVisitorMessageSession*>& sessions( + _messageSessionFactory->_visitorSessions); + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000)); + while (true) { + { + vespalib::LockGuard lock(_messageSessionFactory->_accessLock); + if (sessions.size() > n) { + return *sessions[n]; + } + } + if (clock.getTimeInMillis() > endTime) { + throw vespalib::IllegalStateException( + "Timed out waiting for visitor session", VESPA_STRLOC); + } + std::this_thread::sleep_for(10ms); + } + throw std::logic_error("unreachable"); +} + +void +VisitorTest::getMessagesAndReply( + int expectedCount, + TestVisitorMessageSession& session, + std::vector<document::Document::SP >& docs, + std::vector<document::DocumentId>& docIds, + std::vector<std::string>& infoMessages, + api::ReturnCode::Result result) +{ + for (int i = 0; i < expectedCount; i++) { + session.waitForMessages(1); + mbus::Reply::UP reply; + { + vespalib::MonitorGuard guard(session.getMonitor()); + CPPUNIT_ASSERT(!session.sentMessages.empty()); + vespalib::LinkedPtr<documentapi::DocumentMessage> msg( + session.sentMessages.front()); + CPPUNIT_ASSERT(msg->getPriority() < 16); + + switch (msg->getType()) { + case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT: + docs.push_back( + static_cast<documentapi::PutDocumentMessage&>(*msg) + .getDocument()); + break; + case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + docIds.push_back( + static_cast<documentapi::RemoveDocumentMessage&>(*msg) + .getDocumentId()); + break; + case documentapi::DocumentProtocol::MESSAGE_VISITORINFO: + infoMessages.push_back( + static_cast<documentapi::VisitorInfoMessage&>(*msg) + .getErrorMessage()); + break; + default: + break; + } + + reply = msg->createReply(); + reply->swapState(*msg); + + session.sentMessages.pop_front(); // Release linked ptr ref. + reply->setMessage(mbus::Message::UP(msg.release())); + + if (result != api::ReturnCode::OK) { + reply->addError(mbus::Error(result, "Generic error")); + } + } + session.reply(std::move(reply)); + } +} + +uint64_t +VisitorTest::verifyCreateVisitorReply( + api::ReturnCode::Result expectedResult, + int checkStatsDocsVisited, + int checkStatsBytesVisited) +{ + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL(1, (int)replies.size()); + + std::shared_ptr<api::StorageMessage> msg(replies[0]); + + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType()); + + std::shared_ptr<api::CreateVisitorReply> reply( + std::dynamic_pointer_cast<api::CreateVisitorReply>(msg)); + CPPUNIT_ASSERT(reply.get()); + CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult()); + + if (checkStatsDocsVisited >= 0) { + CPPUNIT_ASSERT_EQUAL(checkStatsDocsVisited, + int(reply->getVisitorStatistics().getDocumentsVisited())); + } + if (checkStatsBytesVisited >= 0) { + CPPUNIT_ASSERT_EQUAL(checkStatsBytesVisited, + int(reply->getVisitorStatistics().getBytesVisited())); + } + + return reply->getMsgId(); +} + +uint32_t +VisitorTest::getMatchingDocuments(std::vector<document::Document::SP >& docs) { + uint32_t equalCount = 0; + for (uint32_t i=0; i<docs.size(); ++i) { + for (uint32_t j=0; j<_documents.size(); ++j) { + if (*docs[i] == *_documents[j] && + docs[i]->getId() == _documents[j]->getId()) + { + equalCount++; + } + } + } + + return equalCount; +} + +void +VisitorTest::sendGetIterReply(GetIterCommand& cmd, + const api::ReturnCode& result, + uint32_t maxDocuments, + bool overrideCompleted) +{ + GetIterReply::SP reply(new GetIterReply(cmd)); + if (result.failed()) { + reply->setResult(result); + _bottom->sendUp(reply); + return; + } + assert(maxDocuments < _documents.size()); + size_t documentCount = maxDocuments != 0 ? maxDocuments : _documents.size(); + for (size_t i = 0; i < documentCount; ++i) { + reply->getEntries().push_back( + spi::DocEntry::LP( + new spi::DocEntry( + spi::Timestamp(1000 + i), + spi::NONE, + document::Document::UP(_documents[i]->clone())))); + } + if (documentCount == _documents.size() || overrideCompleted) { + reply->setCompleted(); + } + _bottom->sendUp(reply); +} + +template <typename T> +std::vector<std::shared_ptr<T> > +VisitorTest::fetchMultipleCommands(DummyStorageLink& link, size_t count) +{ + link.waitForMessages(count, 60); + std::vector<api::StorageMessage::SP> msgs(link.getCommandsOnce()); + std::vector<std::shared_ptr<T> > fetched; + if (msgs.size() != count) { + std::ostringstream oss; + oss << "Expected " + << count + << " messages, got " + << msgs.size() + << ":\n"; + for (size_t i = 0; i < msgs.size(); ++i) { + oss << i << ": " << *msgs[i] << "\n"; + } + CPPUNIT_FAIL(oss.str()); + } + for (size_t i = 0; i < count; ++i) { + std::shared_ptr<T> ret(std::dynamic_pointer_cast<T>(msgs[i])); + if (!ret) { + std::ostringstream oss; + oss << "Expected message of type " + << typeid(T).name() + << ", but got " + << msgs[0]->toString(); + CPPUNIT_FAIL(oss.str()); + } + fetched.push_back(ret); + } + return fetched; +} + +template <typename T> +std::shared_ptr<T> +VisitorTest::fetchSingleCommand(DummyStorageLink& link) +{ + std::vector<std::shared_ptr<T> > ret( + fetchMultipleCommands<T>(link, 1)); + return ret[0]; +} + +std::shared_ptr<api::CreateVisitorCommand> +VisitorTest::makeCreateVisitor(const VisitorOptions& options) +{ + api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + std::shared_ptr<api::CreateVisitorCommand> cmd( + new api::CreateVisitorCommand(options.visitorType, "testvis", "")); + cmd->addBucketToBeVisited(document::BucketId(16, 3)); + cmd->setAddress(address); + cmd->setMaximumPendingReplyCount(UINT32_MAX); + cmd->setControlDestination("foo/bar"); + return cmd; +} + +void +VisitorTest::sendCreateIteratorReply(uint64_t iteratorId) +{ + CreateIteratorCommand::SP createCmd( + fetchSingleCommand<CreateIteratorCommand>(*_bottom)); + spi::IteratorId id(iteratorId); + api::StorageReply::SP reply( + new CreateIteratorReply(*createCmd, id)); + _bottom->sendUp(reply); +} + +void +VisitorTest::testNormalUsage() +{ + initializeTest(); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + + CreateIteratorCommand::SP createCmd( + fetchSingleCommand<CreateIteratorCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(uint8_t(0), createCmd->getPriority()); // Highest pri + spi::IteratorId id(1234); + api::StorageReply::SP reply( + new CreateIteratorReply(*createCmd, id)); + _bottom->sendUp(reply); + + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + + sendGetIterReply(*getIterCmd); + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + getMessagesAndReply(_documents.size(), getSession(0), docs, docIds, infoMessages); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), docIds.size()); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::OK); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); + CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount()); +} + +void +VisitorTest::testFailedCreateIterator() +{ + initializeTest(); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + cmd->addBucketToBeVisited(document::BucketId(16, 4)); + _top->sendDown(cmd); + + CreateIteratorCommand::SP createCmd( + fetchSingleCommand<CreateIteratorCommand>(*_bottom)); + spi::IteratorId id(0); + api::StorageReply::SP reply( + new CreateIteratorReply(*createCmd, id)); + reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE)); + _bottom->sendUp(reply); + + verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE, 0, 0); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +void +VisitorTest::testFailedGetIter() +{ + initializeTest(); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + + sendGetIterReply(*getIterCmd, + api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND)); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::BUCKET_NOT_FOUND, 0, 0); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +void +VisitorTest::testMultipleFailedGetIter() +{ + initializeTest(TestParams().iteratorsPerBucket(2)); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + std::vector<GetIterCommand::SP> getIterCmds( + fetchMultipleCommands<GetIterCommand>(*_bottom, 2)); + + sendGetIterReply(*getIterCmds[0], + api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND)); + + // Wait for an "appropriate" amount of time so that wrongful logic + // will send a DestroyIteratorCommand before all pending GetIters + // have been replied to. + std::this_thread::sleep_for(100ms); + + CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands()); + + sendGetIterReply(*getIterCmds[1], + api::ReturnCode(api::ReturnCode::BUCKET_DELETED)); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::BUCKET_DELETED, 0, 0); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +void +VisitorTest::testDocumentAPIClientError() +{ + initializeTest(); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + { + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + + sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 1); + } + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::INTERNAL_FAILURE); + // INTERNAL_FAILURE is critical, so no visitor info sent + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + + std::this_thread::sleep_for(100ms); + + { + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + + sendGetIterReply(*getIterCmd); + } + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +void +VisitorTest::testNoDocumentAPIResendingForFailedVisitor() +{ + initializeTest(); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + { + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + + sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 2, true); + } + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + // Use non-critical result. Visitor info message should be received + // after we send a NOT_CONNECTED reply. Failing this message as well + // should cause the entire visitor to fail. + getMessagesAndReply(3, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::NOT_CONNECTED); + CPPUNIT_ASSERT_EQUAL(size_t(1), infoMessages.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("[From content node 0] NOT_CONNECTED: Generic error"), + infoMessages[0]); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::NOT_CONNECTED); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); + CPPUNIT_ASSERT_EQUAL(3L, getFailedVisitorDestinationReplyCount()); +} + +void +VisitorTest::testIteratorCreatedForFailedVisitor() +{ + initializeTest(TestParams().iteratorsPerBucket(1).parallelBuckets(2)); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + cmd->addBucketToBeVisited(document::BucketId(16, 4)); + _top->sendDown(cmd); + + std::vector<CreateIteratorCommand::SP> createCmds( + fetchMultipleCommands<CreateIteratorCommand>(*_bottom, 2)); + { + spi::IteratorId id(0); + api::StorageReply::SP reply( + new CreateIteratorReply(*createCmds[0], id)); + reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE)); + _bottom->sendUp(reply); + } + { + spi::IteratorId id(1234); + api::StorageReply::SP reply( + new CreateIteratorReply(*createCmds[1], id)); + _bottom->sendUp(reply); + } + // Want to immediately receive destroyiterator for newly created + // iterator, since we cannot use it anyway when the visitor has failed. + DestroyIteratorCommand::SP destroyCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE, 0, 0); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +/** + * Test that if a visitor fails to send a document API message outright + * (i.e. a case where it will never get a reply), the session is failed + * and the visitor terminates cleanly without counting the failed message + * as pending. + */ +void +VisitorTest::testFailedDocumentAPISend() +{ + initializeTest(TestParams().autoReplyError( + mbus::Error(mbus::ErrorCode::HANDSHAKE_FAILED, + "abandon ship!"))); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + cmd->addBucketToBeVisited(document::BucketId(16, 4)); + _top->sendDown(cmd); + + sendCreateIteratorReply(); + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + sendGetIterReply(*getIterCmd, + api::ReturnCode(api::ReturnCode::OK), + 2, + true); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply( + static_cast<api::ReturnCode::Result>( + mbus::ErrorCode::HANDSHAKE_FAILED), + 0, + 0); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); + // We currently don't count failures to send in this metric; send failures + // indicate a message bus problem and already log a warning when they happen + CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount()); +} + +void +VisitorTest::sendInitialCreateVisitorAndGetIterRound() +{ + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + { + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), + 1, true); + } +} + +void +VisitorTest::testNoVisitorNotificationForTransientFailures() +{ + initializeTest(); + sendInitialCreateVisitorAndGetIterRound(); + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + // Have to make sure time increases in visitor thread so that resend + // times are reached. + _node->getClock().setFakeCycleMode(); + // Should not get info message for BUCKET_DELETED, but resend of Put. + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::BUCKET_DELETED); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + // Should not get info message for BUCKET_NOT_FOUND, but resend of Put. + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::BUCKET_NOT_FOUND); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + // MessageBus error codes guaranteed to fit in return code result. + // Should not get info message for SESSION_BUSY, but resend of Put. + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + static_cast<api::ReturnCode::Result>( + mbus::ErrorCode::SESSION_BUSY)); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + // WRONG_DISTRIBUTION should not be reported, as it will happen all the + // time when initiating remote migrations et al. + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::WRONG_DISTRIBUTION); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + + // Complete message successfully to finish the visitor. + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::OK); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + + fetchSingleCommand<DestroyIteratorCommand>(*_bottom); + + verifyCreateVisitorReply(api::ReturnCode::OK); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +void +VisitorTest::testNotificationSentIfTransientErrorRetriedManyTimes() +{ + constexpr size_t retries( + Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY); + + initializeTest(); + sendInitialCreateVisitorAndGetIterRound(); + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + // Have to make sure time increases in visitor thread so that resend + // times are reached. + _node->getClock().setFakeCycleMode(); + for (size_t attempt = 0; attempt < retries; ++attempt) { + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::WRONG_DISTRIBUTION); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + } + // Should now have a client notification along for the ride. + // This has to be ACKed as OK or the visitor will fail. + getMessagesAndReply(2, getSession(0), docs, docIds, infoMessages, + api::ReturnCode::OK); + CPPUNIT_ASSERT_EQUAL(size_t(1), infoMessages.size()); + // TODO(vekterli) ideally we'd want to test that this happens only once + // per message, but this seems frustratingly complex to do currently. + fetchSingleCommand<DestroyIteratorCommand>(*_bottom); + + verifyCreateVisitorReply(api::ReturnCode::OK); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +std::shared_ptr<api::CreateVisitorReply> +VisitorTest::doCompleteVisitingSession( + const std::shared_ptr<api::CreateVisitorCommand>& cmd) +{ + initializeTest(); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + sendGetIterReply(*getIterCmd, + api::ReturnCode(api::ReturnCode::OK), + 1, + true); + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + _top->waitForMessages(1, 60); + const msg_ptr_vector replies = _top->getRepliesOnce(); + CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size()); + + std::shared_ptr<api::StorageMessage> msg(replies[0]); + + CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, + msg->getType()); + return std::dynamic_pointer_cast<api::CreateVisitorReply>(msg); +} + +void +VisitorTest::testNoMbusTracingIfTraceLevelIsZero() +{ + std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor()); + cmd->getTrace().setLevel(0); + auto reply = doCompleteVisitingSession(cmd); + CPPUNIT_ASSERT(reply->getTrace().getRoot().isEmpty()); +} + +void +VisitorTest::testReplyContainsTraceIfTraceLevelAboveZero() +{ + std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor()); + cmd->getTrace().setLevel(1); + auto reply = doCompleteVisitingSession(cmd); + CPPUNIT_ASSERT(!reply->getTrace().getRoot().isEmpty()); +} + +void +VisitorTest::testNoMoreIteratorsSentWhileMemoryUsedAboveLimit() +{ + initializeTest(TestParams().maxVisitorMemoryUsage(1) + .parallelBuckets(1) + .iteratorsPerBucket(1)); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor()); + _top->sendDown(cmd); + sendCreateIteratorReply(); + + GetIterCommand::SP getIterCmd( + fetchSingleCommand<GetIterCommand>(*_bottom)); + sendGetIterReply(*getIterCmd, + api::ReturnCode(api::ReturnCode::OK), + 1); + + // Pending Document API message towards client; memory usage should prevent + // visitor from sending down additional GetIter messages until the pending + // client message has been replied to and cleared from the internal state. + getSession(0).waitForMessages(1); + // Note that it's possible for this test to exhibit false negatives (but not + // false positives) since the _absence_ of a message means we don't have any + // kind of explicit barrier with which we can synchronize the test and the + // running visitor thread. + std::this_thread::sleep_for(100ms); + CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands()); + + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages); + + // 2nd round of GetIter now allowed. Send reply indicating completion. + getIterCmd = fetchSingleCommand<GetIterCommand>(*_bottom); + sendGetIterReply(*getIterCmd, + api::ReturnCode(api::ReturnCode::OK), + 1, + true); + + getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages); + + DestroyIteratorCommand::SP destroyIterCmd( + fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + + verifyCreateVisitorReply(api::ReturnCode::OK); + CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); +} + +void +VisitorTest::doTestVisitorInstanceHasConsistencyLevel( + vespalib::stringref visitorType, + spi::ReadConsistency expectedConsistency) +{ + initializeTest(); + std::shared_ptr<api::CreateVisitorCommand> cmd( + makeCreateVisitor(VisitorOptions().withVisitorType(visitorType))); + _top->sendDown(cmd); + + auto createCmd = fetchSingleCommand<CreateIteratorCommand>(*_bottom); + CPPUNIT_ASSERT_EQUAL(expectedConsistency, + createCmd->getReadConsistency()); +} + +void +VisitorTest::testDumpVisitorInvokesStrongReadConsistencyIteration() +{ + doTestVisitorInstanceHasConsistencyLevel( + "dumpvisitor", spi::ReadConsistency::STRONG); +} + +// NOTE: SearchVisitor cannot be tested here since it's in a separate module +// which depends on _this_ module for compilation. Instead we let TestVisitor +// use weak consistency, as this is just some internal stuff not used for/by +// any external client use cases. Our primary concern is to test that each +// visitor subclass might report its own read consistency requirement and that +// this is carried along to the CreateIteratorCommand. +void +VisitorTest::testTestVisitorInvokesWeakReadConsistencyIteration() +{ + doTestVisitorInstanceHasConsistencyLevel( + "testvisitor", spi::ReadConsistency::WEAK); +} + +} // namespace storage |