aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/visiting
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/visiting
Publish
Diffstat (limited to 'storage/src/tests/visiting')
-rw-r--r--storage/src/tests/visiting/.gitignore12
-rw-r--r--storage/src/tests/visiting/CMakeLists.txt11
-rw-r--r--storage/src/tests/visiting/commandqueuetest.cpp223
-rw-r--r--storage/src/tests/visiting/memory_bounded_trace_test.cpp131
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp1172
-rw-r--r--storage/src/tests/visiting/visitortest.cpp1023
6 files changed, 2572 insertions, 0 deletions
diff --git a/storage/src/tests/visiting/.gitignore b/storage/src/tests/visiting/.gitignore
new file mode 100644
index 00000000000..184e5d1c936
--- /dev/null
+++ b/storage/src/tests/visiting/.gitignore
@@ -0,0 +1,12 @@
+*.So
+*.lo
+*.o
+.*.swp
+.config.log
+.depend
+.depend.NEW
+.deps
+.libs
+Makefile
+testrunner
+testrunner.core
diff --git a/storage/src/tests/visiting/CMakeLists.txt b/storage/src/tests/visiting/CMakeLists.txt
new file mode 100644
index 00000000000..60e130c003c
--- /dev/null
+++ b/storage/src/tests/visiting/CMakeLists.txt
@@ -0,0 +1,11 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(storage_testvisiting
+ SOURCES
+ commandqueuetest.cpp
+ visitormanagertest.cpp
+ visitortest.cpp
+ memory_bounded_trace_test.cpp
+ DEPENDS
+ AFTER
+ storage_storageconfig
+)
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp
new file mode 100644
index 00000000000..5d6da5f7ea5
--- /dev/null
+++ b/storage/src/tests/visiting/commandqueuetest.cpp
@@ -0,0 +1,223 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
+#include <vespa/storage/visiting/commandqueue.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+
+using vespalib::string;
+
+namespace storage {
+
+struct CommandQueueTest : public CppUnit::TestFixture
+{
+ void testFIFO();
+ void testFIFOWithPriorities();
+ void testReleaseOldest();
+ void testReleaseLowestPriority();
+ void testDeleteIterator();
+
+ CPPUNIT_TEST_SUITE(CommandQueueTest);
+ CPPUNIT_TEST(testFIFO);
+ CPPUNIT_TEST(testFIFOWithPriorities);
+ CPPUNIT_TEST(testReleaseOldest);
+ CPPUNIT_TEST(testReleaseLowestPriority);
+ CPPUNIT_TEST(testDeleteIterator);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(CommandQueueTest);
+
+namespace {
+ std::shared_ptr<api::CreateVisitorCommand> getCommand(
+ const vespalib::stringref & name, int timeout,
+ uint8_t priority = 0)
+ {
+ vespalib::asciistream ost;
+ ost << name << " t=" << timeout << " p=" << static_cast<unsigned int>(priority);
+ // Piggyback name in document selection
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("", "", ost.str()));
+ cmd->setQueueTimeout(timeout);
+ cmd->setPriority(priority);
+ return cmd;
+ }
+
+ const vespalib::string &
+ getCommandString(const std::shared_ptr<api::CreateVisitorCommand>& cmd)
+ {
+ return cmd->getDocumentSelection();
+ }
+
+}
+
+void CommandQueueTest::testFIFO() {
+ framework::defaultimplementation::FakeClock clock;
+ CommandQueue<api::CreateVisitorCommand> queue(clock);
+ CPPUNIT_ASSERT(queue.empty());
+ // Use all default priorities, meaning what comes out should be in the same order
+ // as what went in
+ queue.add(getCommand("first", 1));
+ queue.add(getCommand("second", 10));
+ queue.add(getCommand("third", 5));
+ queue.add(getCommand("fourth", 0));
+ queue.add(getCommand("fifth", 3));
+ queue.add(getCommand("sixth", 14));
+ queue.add(getCommand("seventh", 7));
+
+ CPPUNIT_ASSERT(!queue.empty());
+ std::vector<std::shared_ptr<api::CreateVisitorCommand> > commands;
+ for (;;) {
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ queue.releaseNextCommand().first);
+ if (cmd.get() == 0) break;
+ commands.push_back(cmd);
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(7), commands.size());
+ CPPUNIT_ASSERT_EQUAL(string("first t=1 p=0"), getCommandString(commands[0]));
+ CPPUNIT_ASSERT_EQUAL(string("second t=10 p=0"), getCommandString(commands[1]));
+ CPPUNIT_ASSERT_EQUAL(string("third t=5 p=0"), getCommandString(commands[2]));
+ CPPUNIT_ASSERT_EQUAL(string("fourth t=0 p=0"), getCommandString(commands[3]));
+ CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=0"), getCommandString(commands[4]));
+ CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=0"), getCommandString(commands[5]));
+ CPPUNIT_ASSERT_EQUAL(string("seventh t=7 p=0"), getCommandString(commands[6]));
+}
+
+void CommandQueueTest::testFIFOWithPriorities() {
+ framework::defaultimplementation::FakeClock clock;
+ CommandQueue<api::CreateVisitorCommand> queue(clock);
+ CPPUNIT_ASSERT(queue.empty());
+
+ queue.add(getCommand("first", 1, 10));
+ CPPUNIT_ASSERT_EQUAL(string("first t=1 p=10"), getCommandString(queue.peekLowestPriorityCommand()));
+ queue.add(getCommand("second", 10, 22));
+ queue.add(getCommand("third", 5, 9));
+ CPPUNIT_ASSERT_EQUAL(string("second t=10 p=22"), getCommandString(queue.peekLowestPriorityCommand()));
+ queue.add(getCommand("fourth", 0, 22));
+ queue.add(getCommand("fifth", 3, 22));
+ CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=22"), getCommandString(queue.peekLowestPriorityCommand()));
+ queue.add(getCommand("sixth", 14, 50));
+ queue.add(getCommand("seventh", 7, 0));
+
+ CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=50"), getCommandString(queue.peekLowestPriorityCommand()));
+
+ CPPUNIT_ASSERT(!queue.empty());
+ std::vector<std::shared_ptr<api::CreateVisitorCommand> > commands;
+ for (;;) {
+ std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekNextCommand());
+ std::shared_ptr<api::CreateVisitorCommand> cmd(queue.releaseNextCommand().first);
+ if (cmd.get() == 0 || cmdPeek != cmd) break;
+ commands.push_back(cmd);
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(7), commands.size());
+ CPPUNIT_ASSERT_EQUAL(string("seventh t=7 p=0"), getCommandString(commands[0]));
+ CPPUNIT_ASSERT_EQUAL(string("third t=5 p=9"), getCommandString(commands[1]));
+ CPPUNIT_ASSERT_EQUAL(string("first t=1 p=10"), getCommandString(commands[2]));
+ CPPUNIT_ASSERT_EQUAL(string("second t=10 p=22"), getCommandString(commands[3]));
+ CPPUNIT_ASSERT_EQUAL(string("fourth t=0 p=22"), getCommandString(commands[4]));
+ CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=22"), getCommandString(commands[5]));
+ CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=50"), getCommandString(commands[6]));
+}
+
+void CommandQueueTest::testReleaseOldest() {
+ framework::defaultimplementation::FakeClock clock(framework::defaultimplementation::FakeClock::FAKE_ABSOLUTE);
+ CommandQueue<api::CreateVisitorCommand> queue(clock);
+ CPPUNIT_ASSERT(queue.empty());
+ queue.add(getCommand("first", 10));
+ queue.add(getCommand("second", 100));
+ queue.add(getCommand("third", 1000));
+ queue.add(getCommand("fourth", 5));
+ queue.add(getCommand("fifth", 3000));
+ queue.add(getCommand("sixth", 400));
+ queue.add(getCommand("seventh", 700));
+ CPPUNIT_ASSERT_EQUAL(7u, queue.size());
+
+ typedef CommandQueue<api::CreateVisitorCommand>::CommandEntry CommandEntry;
+ std::list<CommandEntry> timedOut(queue.releaseTimedOut());
+ CPPUNIT_ASSERT(timedOut.empty());
+ clock.addMilliSecondsToTime(400 * 1000);
+ timedOut = queue.releaseTimedOut();
+ CPPUNIT_ASSERT_EQUAL(size_t(4), timedOut.size());
+ std::ostringstream ost;
+ for (std::list<CommandEntry>::const_iterator it = timedOut.begin();
+ it != timedOut.end(); ++it)
+ {
+ ost << getCommandString(it->_command) << "\n";
+ }
+ CPPUNIT_ASSERT_EQUAL(std::string(
+ "fourth t=5 p=0\n"
+ "first t=10 p=0\n"
+ "second t=100 p=0\n"
+ "sixth t=400 p=0\n"), ost.str());
+ CPPUNIT_ASSERT_EQUAL(3u, queue.size());
+}
+
+void CommandQueueTest::testReleaseLowestPriority() {
+ framework::defaultimplementation::FakeClock clock;
+ CommandQueue<api::CreateVisitorCommand> queue(clock);
+ CPPUNIT_ASSERT(queue.empty());
+
+ queue.add(getCommand("first", 1, 10));
+ queue.add(getCommand("second", 10, 22));
+ queue.add(getCommand("third", 5, 9));
+ queue.add(getCommand("fourth", 0, 22));
+ queue.add(getCommand("fifth", 3, 22));
+ queue.add(getCommand("sixth", 14, 50));
+ queue.add(getCommand("seventh", 7, 0));
+ CPPUNIT_ASSERT_EQUAL(7u, queue.size());
+
+ std::vector<std::shared_ptr<api::CreateVisitorCommand> > commands;
+ for (;;) {
+ std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand());
+ std::pair<std::shared_ptr<api::CreateVisitorCommand>, uint64_t> cmd(
+ queue.releaseLowestPriorityCommand());
+ if (cmd.first.get() == 0 || cmdPeek != cmd.first) break;
+ commands.push_back(cmd.first);
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(7), commands.size());
+ CPPUNIT_ASSERT_EQUAL(string("sixth t=14 p=50"), getCommandString(commands[0]));
+ CPPUNIT_ASSERT_EQUAL(string("fifth t=3 p=22"), getCommandString(commands[1]));
+ CPPUNIT_ASSERT_EQUAL(string("fourth t=0 p=22"), getCommandString(commands[2]));
+ CPPUNIT_ASSERT_EQUAL(string("second t=10 p=22"), getCommandString(commands[3]));
+ CPPUNIT_ASSERT_EQUAL(string("first t=1 p=10"), getCommandString(commands[4]));
+ CPPUNIT_ASSERT_EQUAL(string("third t=5 p=9"), getCommandString(commands[5]));
+ CPPUNIT_ASSERT_EQUAL(string("seventh t=7 p=0"), getCommandString(commands[6]));
+}
+
+void CommandQueueTest::testDeleteIterator() {
+ framework::defaultimplementation::FakeClock clock;
+ CommandQueue<api::CreateVisitorCommand> queue(clock);
+ CPPUNIT_ASSERT(queue.empty());
+ queue.add(getCommand("first", 10));
+ queue.add(getCommand("second", 100));
+ queue.add(getCommand("third", 1000));
+ queue.add(getCommand("fourth", 5));
+ queue.add(getCommand("fifth", 3000));
+ queue.add(getCommand("sixth", 400));
+ queue.add(getCommand("seventh", 700));
+ CPPUNIT_ASSERT_EQUAL(7u, queue.size());
+
+ CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin();
+ ++it; ++it;
+ queue.erase(it);
+ CPPUNIT_ASSERT_EQUAL(6u, queue.size());
+
+ std::vector<std::shared_ptr<api::CreateVisitorCommand> > cmds;
+ for (;;) {
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ std::dynamic_pointer_cast<api::CreateVisitorCommand>(
+ queue.releaseNextCommand().first));
+ if (cmd.get() == 0) break;
+ cmds.push_back(cmd);
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(6), cmds.size());
+ CPPUNIT_ASSERT_EQUAL(string("first t=10 p=0"), getCommandString(cmds[0]));
+ CPPUNIT_ASSERT_EQUAL(string("second t=100 p=0"), getCommandString(cmds[1]));
+ CPPUNIT_ASSERT_EQUAL(string("fourth t=5 p=0"), getCommandString(cmds[2]));
+ CPPUNIT_ASSERT_EQUAL(string("fifth t=3000 p=0"), getCommandString(cmds[3]));
+ CPPUNIT_ASSERT_EQUAL(string("sixth t=400 p=0"), getCommandString(cmds[4]));
+ CPPUNIT_ASSERT_EQUAL(string("seventh t=700 p=0"), getCommandString(cmds[5]));
+}
+
+}
+
diff --git a/storage/src/tests/visiting/memory_bounded_trace_test.cpp b/storage/src/tests/visiting/memory_bounded_trace_test.cpp
new file mode 100644
index 00000000000..85eae12fc34
--- /dev/null
+++ b/storage/src/tests/visiting/memory_bounded_trace_test.cpp
@@ -0,0 +1,131 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/storage/visiting/memory_bounded_trace.h>
+
+namespace storage {
+
+class MemoryBoundedTraceTest : public CppUnit::TestFixture
+{
+ CPPUNIT_TEST_SUITE(MemoryBoundedTraceTest);
+ CPPUNIT_TEST(noMemoryReportedUsedWhenEmpty);
+ CPPUNIT_TEST(memoryUsedIsStringLengthForLeafNode);
+ CPPUNIT_TEST(memoryUsedIsAccumulatedRecursivelyForNonLeafNodes);
+ CPPUNIT_TEST(traceNodesCanBeMovedAndImplicitlyCleared);
+ CPPUNIT_TEST(movedTraceTreeIsMarkedAsStrict);
+ CPPUNIT_TEST(canNotAddMoreNodesWhenMemoryUsedExceedsUpperBound);
+ CPPUNIT_TEST(movedTreeIncludesStatsNodeWhenNodesOmitted);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ void noMemoryReportedUsedWhenEmpty();
+ void memoryUsedIsStringLengthForLeafNode();
+ void memoryUsedIsAccumulatedRecursivelyForNonLeafNodes();
+ void traceNodesCanBeMovedAndImplicitlyCleared();
+ void movedTraceTreeIsMarkedAsStrict();
+ void canNotAddMoreNodesWhenMemoryUsedExceedsUpperBound();
+ void movedTreeIncludesStatsNodeWhenNodesOmitted();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(MemoryBoundedTraceTest);
+
+void
+MemoryBoundedTraceTest::noMemoryReportedUsedWhenEmpty()
+{
+ MemoryBoundedTrace trace(100);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), trace.getApproxMemoryUsed());
+}
+
+void
+MemoryBoundedTraceTest::memoryUsedIsStringLengthForLeafNode()
+{
+ MemoryBoundedTrace trace(100);
+ CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0)));
+ CPPUNIT_ASSERT_EQUAL(size_t(11), trace.getApproxMemoryUsed());
+}
+
+void
+MemoryBoundedTraceTest::memoryUsedIsAccumulatedRecursivelyForNonLeafNodes()
+{
+ MemoryBoundedTrace trace(100);
+ mbus::TraceNode innerNode;
+ innerNode.addChild("hello world");
+ innerNode.addChild("goodbye moon");
+ CPPUNIT_ASSERT(trace.add(innerNode));
+ CPPUNIT_ASSERT_EQUAL(size_t(23), trace.getApproxMemoryUsed());
+}
+
+void
+MemoryBoundedTraceTest::traceNodesCanBeMovedAndImplicitlyCleared()
+{
+ MemoryBoundedTrace trace(100);
+ CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0)));
+ mbus::TraceNode target;
+ trace.moveTraceTo(target);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), trace.getApproxMemoryUsed());
+
+ mbus::TraceNode emptinessCheck;
+ trace.moveTraceTo(emptinessCheck);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(0), emptinessCheck.getNumChildren());
+}
+
+/**
+ * We want trace subtrees to be strictly ordered so that the message about
+ * omitted traces will remain soundly as the last ordered node. There is no
+ * particular performance reason for not having strict mode enabled to the
+ * best of my knowledge, since the internal backing data structure is an
+ * ordered vector anyhow.
+ */
+void
+MemoryBoundedTraceTest::movedTraceTreeIsMarkedAsStrict()
+{
+ MemoryBoundedTrace trace(100);
+ CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0)));
+ mbus::TraceNode target;
+ trace.moveTraceTo(target);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren());
+ CPPUNIT_ASSERT(target.getChild(0).isStrict());
+}
+
+void
+MemoryBoundedTraceTest::canNotAddMoreNodesWhenMemoryUsedExceedsUpperBound()
+{
+ // Note: we allow one complete node tree to exceed the bounds, but as soon
+ // as the bound is exceeded no further nodes can be added.
+ MemoryBoundedTrace trace(10);
+ CPPUNIT_ASSERT(trace.add(mbus::TraceNode("hello world", 0)));
+ CPPUNIT_ASSERT_EQUAL(size_t(11), trace.getApproxMemoryUsed());
+
+ CPPUNIT_ASSERT(!trace.add(mbus::TraceNode("the quick red fox runs across "
+ "the freeway", 0)));
+ CPPUNIT_ASSERT_EQUAL(size_t(11), trace.getApproxMemoryUsed());
+
+ mbus::TraceNode target;
+ trace.moveTraceTo(target);
+ // Twice nested node (root -> added trace tree -> leaf with txt).
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren());
+ CPPUNIT_ASSERT(target.getChild(0).getNumChildren() >= 1);
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("hello world"),
+ target.getChild(0).getChild(0).getNote());
+}
+
+void
+MemoryBoundedTraceTest::movedTreeIncludesStatsNodeWhenNodesOmitted()
+{
+ MemoryBoundedTrace trace(5);
+ CPPUNIT_ASSERT(trace.add(mbus::TraceNode("abcdef", 0)));
+ CPPUNIT_ASSERT(!trace.add(mbus::TraceNode("ghijkjlmn", 0)));
+
+ mbus::TraceNode target;
+ trace.moveTraceTo(target);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), target.getNumChildren());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(2), target.getChild(0).getNumChildren());
+ vespalib::string expected("Trace too large; omitted 1 subsequent trace "
+ "trees containing a total of 9 bytes");
+ CPPUNIT_ASSERT_EQUAL(expected, target.getChild(0).getChild(1).getNote());
+}
+
+} // storage
+
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
new file mode 100644
index 00000000000..d782abf7d54
--- /dev/null
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -0,0 +1,1172 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/document/datatype/datatype.h>
+#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/fieldvalue/stringfieldvalue.h>
+#include <vespa/document/fieldvalue/rawfieldvalue.h>
+#include <vespa/log/log.h>
+#include <vespa/storageapi/message/datagram.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <vector>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/visiting/visitormanager.h>
+#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/dummystoragelink.h>
+#include <tests/storageserver/testvisitormessagesession.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vdslib/container/visitorordering.h>
+#include <vespa/documentapi/messagebus/messages/multioperationmessage.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+
+
+LOG_SETUP(".visitormanagertest");
+
+namespace storage {
+namespace {
+ typedef std::vector<api::StorageMessage::SP> msg_ptr_vector;
+}
+
+class VisitorManagerTest : public CppUnit::TestFixture
+{
+private:
+ CPPUNIT_TEST_SUITE(VisitorManagerTest);
+ CPPUNIT_TEST(testNormalUsage);
+ CPPUNIT_TEST(testResending);
+ CPPUNIT_TEST(testVisitEmptyBucket);
+ CPPUNIT_TEST(testMultiBucketVisit);
+ CPPUNIT_TEST(testNoBuckets);
+ CPPUNIT_TEST(testVisitPutsAndRemoves);
+ CPPUNIT_TEST(testVisitWithTimeframeAndSelection);
+ CPPUNIT_TEST(testVisitWithTimeframeAndBogusSelection);
+ CPPUNIT_TEST(testVisitorCallbacks);
+ CPPUNIT_TEST(testVisitorCleanup);
+ CPPUNIT_TEST(testAbortOnFailedVisitorInfo);
+ CPPUNIT_TEST(testAbortOnFieldPathError);
+ CPPUNIT_TEST(testVisitorQueueTimeout);
+ CPPUNIT_TEST(testVisitorProcessingTimeout);
+ CPPUNIT_TEST(testPrioritizedVisitorQueing);
+ CPPUNIT_TEST(testPrioritizedMaxConcurrentVisitors);
+ CPPUNIT_TEST(testVisitorQueingZeroQueueSize);
+ CPPUNIT_TEST(testHitCounter);
+ CPPUNIT_TEST(testStatusPage);
+ CPPUNIT_TEST_SUITE_END();
+
+ static uint32_t docCount;
+ std::vector<document::Document::SP > _documents;
+ std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory;
+ std::unique_ptr<TestServiceLayerApp> _node;
+ std::unique_ptr<DummyStorageLink> _top;
+ VisitorManager* _manager;
+
+public:
+ VisitorManagerTest() : _node() {}
+
+ // Not using setUp since can't throw exception out of it.
+ void initializeTest();
+ void addSomeRemoves(bool removeAll = false);
+ void tearDown();
+ TestVisitorMessageSession& getSession(uint32_t n);
+ uint64_t verifyCreateVisitorReply(
+ api::ReturnCode::Result expectedResult,
+ int checkStatsDocsVisited = -1,
+ int checkStatsBytesVisited = -1);
+ void getMessagesAndReply(
+ int expectedCount,
+ TestVisitorMessageSession& session,
+ std::vector<document::Document::SP >& docs,
+ std::vector<document::DocumentId>& docIds,
+ api::ReturnCode::Result returnCode = api::ReturnCode::OK,
+ documentapi::Priority::Value priority = documentapi::Priority::PRI_NORMAL_4);
+ uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs);
+
+ void testNormalUsage();
+ void testResending();
+ void testVisitEmptyBucket();
+ void testMultiBucketVisit();
+ void testNoBuckets();
+ void testVisitPutsAndRemoves();
+ void testVisitWithTimeframeAndSelection();
+ void testVisitWithTimeframeAndBogusSelection();
+ void testVisitorCallbacks();
+ void testVisitorCleanup();
+ void testAbortOnFailedVisitorInfo();
+ void testAbortOnFieldPathError();
+ void testVisitorQueueTimeout();
+ void testVisitorProcessingTimeout();
+ void testPrioritizedVisitorQueing();
+ void testPrioritizedMaxConcurrentVisitors();
+ void testVisitorQueingZeroQueueSize();
+ void testHitCounter();
+ void testStatusPage();
+};
+
+uint32_t VisitorManagerTest::docCount = 10;
+
+CPPUNIT_TEST_SUITE_REGISTRATION(VisitorManagerTest);
+
+void
+VisitorManagerTest::initializeTest()
+{
+ LOG(debug, "Initializing test");
+ vdstestlib::DirConfig config(getStandardConfig(true));
+ config.getConfig("stor-visitor").set("visitorthreads", "1");
+
+ try {
+ _messageSessionFactory.reset(
+ new TestVisitorMessageSessionFactory(config.getConfigId()));
+ _node.reset(
+ new TestServiceLayerApp(config.getConfigId()));
+ _node->setupDummyPersistence();
+ _node->getStateUpdater().setClusterState(
+ lib::ClusterState::CSP(
+ new lib::ClusterState("storage:1 distributor:1")));
+ _top.reset(new DummyStorageLink());
+ _top->push_back(std::unique_ptr<StorageLink>(_manager
+ = new VisitorManager(
+ config.getConfigId(), _node->getComponentRegister(),
+ *_messageSessionFactory)));
+ _top->push_back(std::unique_ptr<StorageLink>(new FileStorManager(
+ config.getConfigId(), _node->getPartitions(), _node->getPersistenceProvider(), _node->getComponentRegister())));
+ _manager->setTimeBetweenTicks(10);
+ _top->open();
+ } catch (config::InvalidConfigException& e) {
+ fprintf(stderr, "%s\n", e.what());
+ }
+ // Adding some documents so database isn't empty
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::string content(
+ "To be, or not to be: that is the question:\n"
+ "Whether 'tis nobler in the mind to suffer\n"
+ "The slings and arrows of outrageous fortune,\n"
+ "Or to take arms against a sea of troubles,\n"
+ "And by opposing end them? To die: to sleep;\n"
+ "No more; and by a sleep to say we end\n"
+ "The heart-ache and the thousand natural shocks\n"
+ "That flesh is heir to, 'tis a consummation\n"
+ "Devoutly to be wish'd. To die, to sleep;\n"
+ "To sleep: perchance to dream: ay, there's the rub;\n"
+ "For in that sleep of death what dreams may come\n"
+ "When we have shuffled off this mortal coil,\n"
+ "Must give us pause: there's the respect\n"
+ "That makes calamity of so long life;\n"
+ "For who would bear the whips and scorns of time,\n"
+ "The oppressor's wrong, the proud man's contumely,\n"
+ "The pangs of despised love, the law's delay,\n"
+ "The insolence of office and the spurns\n"
+ "That patient merit of the unworthy takes,\n"
+ "When he himself might his quietus make\n"
+ "With a bare bodkin? who would fardels bear,\n"
+ "To grunt and sweat under a weary life,\n"
+ "But that the dread of something after death,\n"
+ "The undiscover'd country from whose bourn\n"
+ "No traveller returns, puzzles the will\n"
+ "And makes us rather bear those ills we have\n"
+ "Than fly to others that we know not of?\n"
+ "Thus conscience does make cowards of us all;\n"
+ "And thus the native hue of resolution\n"
+ "Is sicklied o'er with the pale cast of thought,\n"
+ "And enterprises of great pith and moment\n"
+ "With this regard their currents turn awry,\n"
+ "And lose the name of action. - Soft you now!\n"
+ "The fair Ophelia! Nymph, in thy orisons\n"
+ "Be all my sins remember'd.\n");
+ for (uint32_t i=0; i<docCount; ++i) {
+ std::ostringstream uri;
+ uri << "userdoc:test:" << i % 10 << ":http://www.ntnu.no/"
+ << i << ".html";
+
+ _documents.push_back(document::Document::SP(
+ _node->getTestDocMan().createDocument(content, uri.str())));
+ const document::DocumentType& type(_documents.back()->getType());
+ _documents.back()->setValue(type.getField("headerval"),
+ document::IntFieldValue(i % 4));
+ }
+ for (uint32_t i=0; i<10; ++i) {
+ document::BucketId bid(16, i);
+
+ std::shared_ptr<api::CreateBucketCommand> cmd(
+ new api::CreateBucketCommand(bid));
+ cmd->setAddress(address);
+ cmd->setSourceIndex(0);
+ _top->sendDown(cmd);
+ _top->waitForMessages(1, 60);
+ _top->reset();
+
+ StorBucketDatabase::WrappedEntry entry(
+ _node->getStorageBucketDatabase().get(bid, "",
+ StorBucketDatabase::CREATE_IF_NONEXISTING));
+ entry->disk = 0;
+ entry.write();
+ }
+ for (uint32_t i=0; i<docCount; ++i) {
+ document::BucketId bid(16, i);
+
+ std::shared_ptr<api::PutCommand> cmd(
+ new api::PutCommand(bid, _documents[i], i+1));
+ cmd->setAddress(address);
+ _top->sendDown(cmd);
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
+ std::shared_ptr<api::PutReply> reply(
+ std::dynamic_pointer_cast<api::PutReply>(
+ replies[0]));
+ CPPUNIT_ASSERT(reply.get());
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK),
+ reply->getResult());
+ }
+ LOG(debug, "Done initializing test");
+}
+
+void
+VisitorManagerTest::addSomeRemoves(bool removeAll)
+{
+ framework::defaultimplementation::FakeClock clock;
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ for (uint32_t i=0; i<docCount; i += (removeAll ? 1 : 4)) {
+ // Add it to the database
+ document::BucketId bid(16, i % 10);
+ std::shared_ptr<api::RemoveCommand> cmd(
+ new api::RemoveCommand(
+ bid, _documents[i]->getId(), clock.getTimeInMicros().getTime() + docCount + i + 1));
+ cmd->setAddress(address);
+ _top->sendDown(cmd);
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
+ std::shared_ptr<api::RemoveReply> reply(
+ std::dynamic_pointer_cast<api::RemoveReply>(
+ replies[0]));
+ CPPUNIT_ASSERT(reply.get());
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK),
+ reply->getResult());
+ }
+}
+
+void
+VisitorManagerTest::tearDown()
+{
+ if (_top.get() != 0) {
+ _top->close();
+ _top->flush();
+ _top.reset(0);
+ }
+ _node.reset(0);
+ _messageSessionFactory.reset(0);
+ _manager = 0;
+}
+
+TestVisitorMessageSession&
+VisitorManagerTest::getSession(uint32_t n)
+{
+ // Wait until we have started the visitor
+ const std::vector<TestVisitorMessageSession*>& sessions(
+ _messageSessionFactory->_visitorSessions);
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ while (true) {
+ {
+ vespalib::LockGuard lock(_messageSessionFactory->_accessLock);
+ if (sessions.size() > n) {
+ return *sessions[n];
+ }
+ }
+ if (clock.getTimeInMillis() > endTime) {
+ throw vespalib::IllegalStateException(
+ "Timed out waiting for visitor session", VESPA_STRLOC);
+ }
+ FastOS_Thread::Sleep(10);
+ }
+ throw std::logic_error("unreachable");
+}
+
+void
+VisitorManagerTest::getMessagesAndReply(
+ int expectedCount,
+ TestVisitorMessageSession& session,
+ std::vector<document::Document::SP >& docs,
+ std::vector<document::DocumentId>& docIds,
+ api::ReturnCode::Result result,
+ documentapi::Priority::Value priority)
+{
+ for (int i = 0; i < expectedCount; i++) {
+ session.waitForMessages(i + 1);
+ mbus::Reply::UP reply;
+ {
+ vespalib::MonitorGuard guard(session.getMonitor());
+
+ CPPUNIT_ASSERT_EQUAL(priority,
+ session.sentMessages[i]->getPriority());
+
+ switch (session.sentMessages[i]->getType()) {
+ case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT:
+ docs.push_back(static_cast<documentapi::PutDocumentMessage&>(
+ *session.sentMessages[i]).getDocument());
+ break;
+ case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
+ docIds.push_back(static_cast<documentapi::RemoveDocumentMessage&>(
+ *session.sentMessages[i]).getDocumentId());
+ break;
+ default:
+ break;
+ }
+
+ reply = session.sentMessages[i]->createReply();
+ reply->swapState(*session.sentMessages[i]);
+ reply->setMessage(
+ mbus::Message::UP(session.sentMessages[i].release()));
+
+ if (result != api::ReturnCode::OK) {
+ reply->addError(mbus::Error(result, "Generic error"));
+ }
+ }
+
+ session.reply(std::move(reply));
+ }
+}
+
+uint64_t
+VisitorManagerTest::verifyCreateVisitorReply(
+ api::ReturnCode::Result expectedResult,
+ int checkStatsDocsVisited,
+ int checkStatsBytesVisited)
+{
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL(1, (int)replies.size());
+
+ std::shared_ptr<api::StorageMessage> msg(replies[0]);
+
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
+
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT(reply.get());
+ CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult());
+
+ if (checkStatsDocsVisited >= 0) {
+ CPPUNIT_ASSERT_EQUAL(checkStatsDocsVisited,
+ int(reply->getVisitorStatistics().getDocumentsVisited()));
+ }
+ if (checkStatsBytesVisited >= 0) {
+ CPPUNIT_ASSERT_EQUAL(checkStatsBytesVisited,
+ int(reply->getVisitorStatistics().getBytesVisited()));
+ }
+
+ return reply->getMsgId();
+}
+
+uint32_t
+VisitorManagerTest::getMatchingDocuments(std::vector<document::Document::SP >& docs) {
+ uint32_t equalCount = 0;
+ for (uint32_t i=0; i<docs.size(); ++i) {
+ for (uint32_t j=0; j<_documents.size(); ++j) {
+ if (docs[i]->getId() == _documents[j]->getId()
+ && *docs[i] == *_documents[j])
+
+ {
+ equalCount++;
+ }
+ }
+ }
+
+ return equalCount;
+}
+
+void
+VisitorManagerTest::testHitCounter()
+{
+ document::OrderingSpecification spec(document::OrderingSpecification::ASCENDING, 42, 7, 2);
+ Visitor::HitCounter hitCounter(&spec);
+
+ hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:42:foo"), 450);
+ hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:49:foo"), 450);
+ hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:60:foo"), 450);
+ hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:10:foo"), 450);
+ hitCounter.addHit(document::DocumentId("orderdoc(7,2):mail:1234:21:foo"), 450);
+
+ CPPUNIT_ASSERT_EQUAL(3, (int)hitCounter.getFirstPassHits());
+ CPPUNIT_ASSERT_EQUAL(1350, (int)hitCounter.getFirstPassBytes());
+ CPPUNIT_ASSERT_EQUAL(2, (int)hitCounter.getSecondPassHits());
+ CPPUNIT_ASSERT_EQUAL(900, (int)hitCounter.getSecondPassBytes());
+}
+
+namespace {
+
+int getTotalSerializedSize(const std::vector<document::Document::SP>& docs)
+{
+ int total = 0;
+ for (size_t i = 0; i < docs.size(); ++i) {
+ total += int(docs[i]->serialize()->getLength());
+ }
+ return total;
+}
+
+}
+
+void
+VisitorManagerTest::testNormalUsage()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setControlDestination("foo/bar");
+ _top->sendDown(cmd);
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ // Should receive one multioperation message (bucket 3 has one document).
+ getMessagesAndReply(1, getSession(0), docs, docIds);
+
+ // All data has been replied to, expecting to get a create visitor reply
+ verifyCreateVisitorReply(api::ReturnCode::OK,
+ int(docs.size()),
+ getTotalSerializedSize(docs));
+
+ CPPUNIT_ASSERT_EQUAL(1u, getMatchingDocuments(docs));
+ CPPUNIT_ASSERT(!_manager->hasPendingMessageState());
+}
+
+void
+VisitorManagerTest::testResending()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setControlDestination("foo/bar");
+ _top->sendDown(cmd);
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ TestVisitorMessageSession& session = getSession(0);
+ getMessagesAndReply(1, session, docs, docIds, api::ReturnCode::NOT_READY);
+
+ {
+ session.waitForMessages(2);
+
+ documentapi::DocumentMessage* msg = session.sentMessages[1].get();
+
+ mbus::Reply::UP reply = msg->createReply();
+
+ CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO,
+ session.sentMessages[1]->getType());
+ reply->swapState(*session.sentMessages[1]);
+ reply->setMessage(mbus::Message::UP(session.sentMessages[1].release()));
+ session.reply(std::move(reply));
+ }
+
+ _node->getClock().addSecondsToTime(1);
+
+ {
+ session.waitForMessages(3);
+
+ documentapi::DocumentMessage* msg = session.sentMessages[2].get();
+
+ mbus::Reply::UP reply = msg->createReply();
+
+ reply->swapState(*session.sentMessages[2]);
+ reply->setMessage(mbus::Message::UP(session.sentMessages[2].release()));
+ session.reply(std::move(reply));
+ }
+
+ // All data has been replied to, expecting to get a create visitor reply
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+}
+
+void
+VisitorManagerTest::testVisitEmptyBucket()
+{
+ initializeTest();
+ addSomeRemoves(true);
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+
+ cmd->setAddress(address);
+ _top->sendDown(cmd);
+
+ // All data has been replied to, expecting to get a create visitor reply
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+}
+
+void
+VisitorManagerTest::testMultiBucketVisit()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ for (uint32_t i=0; i<10; ++i) {
+ cmd->addBucketToBeVisited(document::BucketId(16, i));
+ }
+ cmd->setAddress(address);
+ cmd->setDataDestination("fooclient.0");
+ _top->sendDown(cmd);
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ // Should receive one multioperation message for each bucket
+ getMessagesAndReply(10, getSession(0), docs, docIds);
+
+ // All data has been replied to, expecting to get a create visitor reply
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+
+ CPPUNIT_ASSERT_EQUAL(docCount, getMatchingDocuments(docs));
+}
+
+void
+VisitorManagerTest::testNoBuckets()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+
+ cmd->setAddress(address);
+ _top->sendDown(cmd);
+
+ // Should get one reply; a CreateVisitorReply with error since no
+ // buckets where specified in the CreateVisitorCommand
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(
+ replies[0]));
+ // Verify that cast went ok => it was a CreateVisitorReply message
+ CPPUNIT_ASSERT(reply.get());
+ api::ReturnCode ret(api::ReturnCode::ILLEGAL_PARAMETERS,
+ "No buckets specified");
+ CPPUNIT_ASSERT_EQUAL(ret, reply->getResult());
+}
+
+void VisitorManagerTest::testVisitPutsAndRemoves()
+{
+ initializeTest();
+ addSomeRemoves();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->setAddress(address);
+ cmd->setVisitRemoves();
+ for (uint32_t i=0; i<10; ++i) {
+ cmd->addBucketToBeVisited(document::BucketId(16, i));
+ }
+ _top->sendDown(cmd);
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ getMessagesAndReply(10, getSession(0), docs, docIds);
+
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+
+ CPPUNIT_ASSERT_EQUAL(
+ docCount - (docCount + 3) / 4,
+ getMatchingDocuments(docs));
+
+ CPPUNIT_ASSERT_EQUAL(
+ (size_t) (docCount + 3) / 4,
+ docIds.size());
+}
+
+void VisitorManagerTest::testVisitWithTimeframeAndSelection()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis",
+ "testdoctype1.headerval < 2"));
+ cmd->setFromTime(3);
+ cmd->setToTime(8);
+ for (uint32_t i=0; i<10; ++i) {
+ cmd->addBucketToBeVisited(document::BucketId(16, i));
+ }
+ cmd->setAddress(address);
+ _top->sendDown(cmd);
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ getMessagesAndReply(2, getSession(0), docs, docIds);
+
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+
+ CPPUNIT_ASSERT_EQUAL((size_t) 2, docs.size());
+ std::set<std::string> expected;
+ expected.insert("userdoc:test:4:http://www.ntnu.no/4.html");
+ expected.insert("userdoc:test:5:http://www.ntnu.no/5.html");
+ std::set<std::string> actual;
+ for (uint32_t i=0; i<docs.size(); ++i) {
+ actual.insert(docs[i]->getId().toString());
+ }
+ CPPUNIT_ASSERT_EQUAL(expected, actual);
+}
+
+void VisitorManagerTest::testVisitWithTimeframeAndBogusSelection()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis",
+ "DocType(testdoctype1---///---) XXX BAD Field(headerval) < 2"));
+ cmd->setFromTime(3);
+ cmd->setToTime(8);
+ for (uint32_t i=0; i<10; ++i) {
+ cmd->addBucketToBeVisited(document::BucketId(16, i));
+ }
+ cmd->setAddress(address);
+
+ _top->sendDown(cmd);
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, replies.size());
+
+ api::StorageReply* reply = dynamic_cast<api::StorageReply*>(
+ replies.front().get());
+ CPPUNIT_ASSERT(reply);
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ILLEGAL_PARAMETERS,
+ reply->getResult().getResult());
+}
+
+void
+VisitorManagerTest::testVisitorCallbacks()
+{
+ initializeTest();
+ std::ostringstream replydata;
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("TestVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->addBucketToBeVisited(document::BucketId(16, 5));
+ cmd->setAddress(address);
+ _top->sendDown(cmd);
+
+ // Wait until we have started the visitor
+ TestVisitorMessageSession& session = getSession(0);
+
+ for (uint32_t i = 0; i < 6; i++) {
+ session.waitForMessages(i + 1);
+ mbus::Reply::UP reply;
+ {
+ vespalib::MonitorGuard guard(session.getMonitor());
+
+ CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_MAPVISITOR, session.sentMessages[i]->getType());
+
+ documentapi::MapVisitorMessage* mapvisitormsg(
+ static_cast<documentapi::MapVisitorMessage*>(session.sentMessages[i].get()));
+
+ replydata << mapvisitormsg->getData().get("msg");
+
+ reply = mapvisitormsg->createReply();
+ reply->swapState(*session.sentMessages[i]);
+ reply->setMessage(mbus::Message::UP(session.sentMessages[i].release()));
+ }
+ session.reply(std::move(reply));
+ }
+
+ // All data has been replied to, expecting to get a create visitor reply
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+
+ CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 1, "Starting visitor");
+ CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 2, "Handling block of 1 documents");
+ CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 2, "completedBucket");
+ CPPUNIT_ASSERT_SUBSTRING_COUNT(replydata.str(), 1, "completedVisiting");
+}
+
+void
+VisitorManagerTest::testVisitorCleanup()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+
+ // Start a bunch of invalid visitors
+ for (uint32_t i=0; i<10; ++i) {
+ std::ostringstream ost;
+ ost << "testvis" << i;
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("InvalidVisitor", ost.str(), ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(0);
+ _top->sendDown(cmd);
+ _top->waitForMessages(i+1, 60);
+ }
+
+ // Start a bunch of visitors
+ for (uint32_t i=0; i<10; ++i) {
+ std::ostringstream ost;
+ ost << "testvis" << (i + 10);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", ost.str(), ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(0);
+ _top->sendDown(cmd);
+ }
+
+
+ // Should get 14 immediate replies - 10 failures and 4 busy
+ {
+ _top->waitForMessages(14, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+
+ int failures = 0;
+ int busy = 0;
+
+ for (uint32_t i=0; i< 14; ++i) {
+ std::shared_ptr<api::StorageMessage> msg(replies[i]);
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT(reply.get());
+
+ if (i < 10) {
+ if (api::ReturnCode::ILLEGAL_PARAMETERS == reply->getResult().getResult()) {
+ failures++;
+ } else {
+ std::cerr << reply->getResult() << "\n";
+ }
+ } else {
+ if (api::ReturnCode::BUSY == reply->getResult().getResult()) {
+ busy++;
+ }
+ }
+ }
+
+ CPPUNIT_ASSERT_EQUAL(10, failures);
+ CPPUNIT_ASSERT_EQUAL(4, busy);
+ }
+
+ // Finish a visitor
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ getMessagesAndReply(1, getSession(0), docs, docIds);
+
+ // Should get a reply for the visitor.
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+
+ // Fail a visitor
+ getMessagesAndReply(1, getSession(1), docs, docIds, api::ReturnCode::INTERNAL_FAILURE);
+
+ // Should get a reply for the visitor.
+ verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE);
+
+ while (_manager->getActiveVisitorCount() > 2) {
+ FastOS_Thread::Sleep(10);
+ }
+
+ // Start a bunch of more visitors
+ for (uint32_t i=0; i<10; ++i) {
+ std::ostringstream ost;
+ ost << "testvis" << (i + 24);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", ost.str(), ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(0);
+ _top->sendDown(cmd);
+ }
+
+ // Should now get 8 busy.
+ _top->waitForMessages(8, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL(8, (int)replies.size());
+
+ for (uint32_t i=0; i< replies.size(); ++i) {
+ std::shared_ptr<api::StorageMessage> msg(replies[i]);
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT(reply.get());
+
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::BUSY, reply->getResult().getResult());
+ }
+}
+
+void
+VisitorManagerTest::testAbortOnFailedVisitorInfo()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+
+ {
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(0);
+ _top->sendDown(cmd);
+ }
+
+ uint32_t visitorRepliesReceived = 0;
+ uint32_t oki = 0;
+ uint32_t failed = 0;
+
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ TestVisitorMessageSession& session = getSession(0);
+ getMessagesAndReply(1, session, docs, docIds, api::ReturnCode::NOT_READY);
+
+ {
+ session.waitForMessages(2);
+
+ documentapi::DocumentMessage* cmd = session.sentMessages[1].get();
+
+ mbus::Reply::UP reply = cmd->createReply();
+
+ CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::MESSAGE_VISITORINFO, session.sentMessages[1]->getType());
+ reply->swapState(*session.sentMessages[1]);
+ reply->setMessage(mbus::Message::UP(session.sentMessages[1].release()));
+ reply->addError(mbus::Error(api::ReturnCode::NOT_CONNECTED, "Me no ready"));
+ session.reply(std::move(reply));
+ }
+
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ for (uint32_t i=0; i< replies.size(); ++i) {
+ std::shared_ptr<api::StorageMessage> msg(replies[i]);
+ if (msg->getType() == api::MessageType::VISITOR_CREATE_REPLY)
+ {
+ ++visitorRepliesReceived;
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT(reply.get());
+ if (reply->getResult().success()) {
+ ++oki;
+ std::cerr << "\n" << reply->toString(true) << "\n";
+ } else {
+ ++failed;
+ }
+ }
+ }
+
+ std::ostringstream errmsg;
+ errmsg << "oki " << oki << ", failed " << failed;
+
+ CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 0u, oki);
+ CPPUNIT_ASSERT_EQUAL_MSG(errmsg.str(), 1u, failed);
+}
+
+void
+VisitorManagerTest::testAbortOnFieldPathError()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+
+ // Use bogus field path to force error to happen
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor",
+ "testvis",
+ "testdoctype1.headerval{bogus} == 1234"));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(0);
+ _top->sendDown(cmd);
+
+ verifyCreateVisitorReply(api::ReturnCode::ILLEGAL_PARAMETERS);
+}
+
+void
+VisitorManagerTest::testVisitorQueueTimeout()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ _manager->enforceQueueUsage();
+
+ {
+ vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor());
+
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(1);
+ cmd->setTimeout(100 * 1000 * 1000);
+ _top->sendDown(cmd);
+
+ _node->getClock().addSecondsToTime(1000);
+ }
+
+ // Don't answer any messages. Make sure we timeout anyways.
+ uint32_t visitorRepliesReceived = 0;
+
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ std::shared_ptr<api::StorageMessage> msg(replies[0]);
+
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
+ ++visitorRepliesReceived;
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::BUSY,
+ "Visitor timed out in visitor queue"),
+ reply->getResult());
+}
+
+void
+VisitorManagerTest::testVisitorProcessingTimeout()
+{
+ initializeTest();
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(0);
+ cmd->setTimeout(100);
+ _top->sendDown(cmd);
+
+ // Wait for Put before increasing the clock
+ TestVisitorMessageSession& session = getSession(0);
+ session.waitForMessages(1);
+
+ _node->getClock().addSecondsToTime(1000);
+
+ // Don't answer any messages. Make sure we timeout anyways.
+ uint32_t visitorRepliesReceived = 0;
+
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ std::shared_ptr<api::StorageMessage> msg(replies[0]);
+
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
+ ++visitorRepliesReceived;
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
+ reply->getResult().getResult());
+}
+
+namespace {
+ uint32_t nextVisitor = 0;
+
+ api::StorageMessage::Id
+ sendCreateVisitor(uint32_t timeout, DummyStorageLink& top, uint8_t priority = 127) {
+ std::ostringstream ost;
+ ost << "testvis" << ++nextVisitor;
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand("DumpVisitor", ost.str(), ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setQueueTimeout(timeout);
+ cmd->setPriority(priority);
+ top.sendDown(cmd);
+ return cmd->getMsgId();
+ }
+}
+
+void
+VisitorManagerTest::testPrioritizedVisitorQueing()
+{
+ framework::HttpUrlPath path("?verbose=true&allvisitors=true");
+ initializeTest();
+
+ _manager->setMaxConcurrentVisitors(4);
+ _manager->setMaxVisitorQueueSize(4);
+
+ api::StorageMessage::Id ids[10] = { 0 };
+
+ // First 4 should just start..
+ for (uint32_t i = 0; i < 4; ++i) {
+ ids[i] = sendCreateVisitor(i, *_top, i);
+ }
+
+ // Next ones should be queued - (Better not finish before we get here)
+ // Submit with higher priorities
+ for (uint32_t i = 0; i < 4; ++i) {
+ ids[i + 4] = sendCreateVisitor(1000, *_top, 100 - i);
+ }
+
+ // Queue is now full with a pri 100 visitor at its end
+ // Send a lower pri visitor that will be busy-returned immediately
+ ids[8] = sendCreateVisitor(1000, *_top, 130);
+
+ CPPUNIT_ASSERT_EQUAL(ids[8], verifyCreateVisitorReply(api::ReturnCode::BUSY));
+
+ // Send a higher pri visitor that will take the place of pri 100 visitor
+ ids[9] = sendCreateVisitor(1000, *_top, 60);
+
+ CPPUNIT_ASSERT_EQUAL(ids[4], verifyCreateVisitorReply(api::ReturnCode::BUSY));
+
+ // Finish the first visitor
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+ getMessagesAndReply(1, getSession(0), docs, docIds, api::ReturnCode::OK,
+ documentapi::Priority::PRI_HIGHEST);
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+
+ // We should now start the highest priority visitor.
+ getMessagesAndReply(1, getSession(4), docs, docIds, api::ReturnCode::OK,
+ documentapi::Priority::PRI_VERY_HIGH);
+ CPPUNIT_ASSERT_EQUAL(ids[9], verifyCreateVisitorReply(api::ReturnCode::OK));
+}
+
+void
+VisitorManagerTest::testPrioritizedMaxConcurrentVisitors() {
+ framework::HttpUrlPath path("?verbose=true&allvisitors=true");
+ initializeTest();
+
+ api::StorageMessage::Id ids[17] = { 0 };
+
+ // Number of concurrent visitors is in [4, 8], depending on priority
+ // Max concurrent:
+ // [0, 1): 4
+ // [1, 64): 3
+ // [64, 128): 2
+ // [128, 192): 1
+ // [192, 256): 0
+ _manager->setMaxConcurrentVisitors(4, 4);
+ _manager->setMaxVisitorQueueSize(6);
+
+ // First 4 should just start..
+ for (uint32_t i = 0; i < 4; ++i) {
+ ids[i] = sendCreateVisitor(i, *_top, i);
+ }
+
+ // Low pri messages; get put into queue
+ for (uint32_t i = 0; i < 6; ++i) {
+ ids[i + 4] = sendCreateVisitor(1000, *_top, 203 - i);
+ }
+
+ // Higher pri message: fits happily into 1 extra concurrent slot
+ ids[10] = sendCreateVisitor(1000, *_top, 190);
+
+ // Should punch pri203 msg out of the queue -> busy
+ ids[11] = sendCreateVisitor(1000, *_top, 197);
+
+ CPPUNIT_ASSERT_EQUAL(ids[4], verifyCreateVisitorReply(api::ReturnCode::BUSY));
+
+ // No concurrency slots left for this message -> busy
+ ids[12] = sendCreateVisitor(1000, *_top, 204);
+
+ CPPUNIT_ASSERT_EQUAL(ids[12], verifyCreateVisitorReply(api::ReturnCode::BUSY));
+
+ // Gets a concurrent slot
+ ids[13] = sendCreateVisitor(1000, *_top, 80);
+
+ // Kicks pri 202 out of the queue -> busy
+ ids[14] = sendCreateVisitor(1000, *_top, 79);
+
+ CPPUNIT_ASSERT_EQUAL(ids[5], verifyCreateVisitorReply(api::ReturnCode::BUSY));
+
+ // Gets a concurrent slot
+ ids[15] = sendCreateVisitor(1000, *_top, 63);
+
+ // Very Important Visitor(tm) gets a concurrent slot
+ ids[16] = sendCreateVisitor(1000, *_top, 0);
+
+ std::vector<document::Document::SP > docs;
+ std::vector<document::DocumentId> docIds;
+
+ std::set<uint64_t> finishedVisitors;
+
+ // Verify that the correct visitors are running.
+ for (int i = 0; i < 8; i++) {
+ documentapi::Priority::Value priority =
+ documentapi::Priority::PRI_HIGHEST; // ids 0-3,16
+ if (i == 4) {
+ priority = documentapi::Priority::PRI_VERY_LOW; // ids 10
+ } else if (i == 5) {
+ priority = documentapi::Priority::PRI_HIGH_2; // ids 13
+ } else if (i == 6) {
+ priority = documentapi::Priority::PRI_HIGH_1; // ids 15
+ }
+ getMessagesAndReply(1, getSession(i), docs, docIds, api::ReturnCode::OK,
+ priority);
+ finishedVisitors.insert(verifyCreateVisitorReply(api::ReturnCode::OK));
+ }
+
+ for (int i = 0; i < 4; i++) {
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[i]) != finishedVisitors.end());
+ }
+
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[10]) != finishedVisitors.end());
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[13]) != finishedVisitors.end());
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[15]) != finishedVisitors.end());
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[16]) != finishedVisitors.end());
+
+ finishedVisitors.clear();
+
+ for (int i = 8; i < 14; i++) {
+ documentapi::Priority::Value priority =
+ documentapi::Priority::PRI_LOWEST; // ids 6-9,11
+ if (i == 8) {
+ priority = documentapi::Priority::PRI_HIGH_2; // ids 14
+ }
+ getMessagesAndReply(1, getSession(i), docs, docIds, api::ReturnCode::OK,
+ priority);
+ uint64_t msgId = verifyCreateVisitorReply(api::ReturnCode::OK);
+ finishedVisitors.insert(msgId);
+ }
+
+ for (int i = 6; i < 10; i++) {
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[i]) != finishedVisitors.end());
+ }
+
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[11]) != finishedVisitors.end());
+ CPPUNIT_ASSERT(finishedVisitors.find(ids[14]) != finishedVisitors.end());
+}
+
+void
+VisitorManagerTest::testVisitorQueingZeroQueueSize() {
+ framework::HttpUrlPath path("?verbose=true&allvisitors=true");
+ initializeTest();
+
+ _manager->setMaxConcurrentVisitors(4);
+ _manager->setMaxVisitorQueueSize(0);
+
+ // First 4 should just start..
+ for (uint32_t i = 0; i < 4; ++i) {
+ sendCreateVisitor(i, *_top, i);
+ }
+ // Queue size is zero, all visitors will be busy-returned
+ for (uint32_t i = 0; i < 5; ++i) {
+ sendCreateVisitor(1000, *_top, 100 - i);
+ verifyCreateVisitorReply(api::ReturnCode::BUSY);
+ }
+}
+
+void
+VisitorManagerTest::testStatusPage() {
+ framework::HttpUrlPath path("?verbose=true&allvisitors=true");
+ initializeTest();
+
+ _manager->setMaxConcurrentVisitors(1, 1);
+ _manager->setMaxVisitorQueueSize(6);
+ // 1 running, 1 queued
+ sendCreateVisitor(1000000, *_top, 1);
+ sendCreateVisitor(1000000, *_top, 128);
+
+ TestVisitorMessageSession& session = getSession(0);
+ session.waitForMessages(1);
+
+ std::ostringstream ss;
+ static_cast<framework::HtmlStatusReporter&>(*_manager).reportHtmlStatus(ss, path);
+
+ std::string str(ss.str());
+ CPPUNIT_ASSERT(str.find("Currently running visitors") != std::string::npos);
+ // Should be propagated to visitor thread
+ CPPUNIT_ASSERT(str.find("Running 1 visitors") != std::string::npos); // 1 active
+ CPPUNIT_ASSERT(str.find("waiting visitors 1") != std::string::npos); // 1 queued
+ CPPUNIT_ASSERT(str.find("Visitor thread 0") != std::string::npos);
+ CPPUNIT_ASSERT(str.find("Disconnected visitor timeout") != std::string::npos); // verbose per thread
+ CPPUNIT_ASSERT(str.find("Message #1 <b>putdocumentmessage</b>") != std::string::npos); // 1 active
+}
+
+}
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
new file mode 100644
index 00000000000..aed08a676b8
--- /dev/null
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -0,0 +1,1023 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/document/datatype/datatype.h>
+#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/fieldvalue/stringfieldvalue.h>
+#include <vespa/document/fieldvalue/rawfieldvalue.h>
+#include <vespa/log/log.h>
+#include <vespa/storageapi/message/datagram.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/visiting/visitormanager.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/dummystoragelink.h>
+#include <tests/storageserver/testvisitormessagesession.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vdslib/container/visitorordering.h>
+#include <vespa/documentapi/messagebus/messages/multioperationmessage.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+#include <vector>
+#include <thread>
+#include <chrono>
+
+LOG_SETUP(".visitortest");
+
+using namespace std::chrono_literals;
+
+namespace storage {
+
+namespace {
+
+using msg_ptr_vector = std::vector<api::StorageMessage::SP>;
+
+struct TestParams
+{
+ TestParams& iteratorsPerBucket(uint32_t n) {
+ _iteratorsPerBucket = n;
+ return *this;
+ }
+ TestParams& maxVisitorMemoryUsage(uint32_t bytes) {
+ _maxVisitorMemoryUsage = bytes;
+ return *this;
+ }
+ TestParams& parallelBuckets(uint32_t n) {
+ _parallelBuckets = n;
+ return *this;
+ }
+ TestParams& autoReplyError(const mbus::Error& error) {
+ _autoReplyError = error;
+ return *this;
+ }
+
+ uint32_t _iteratorsPerBucket {1};
+ uint32_t _maxVisitorMemoryUsage {UINT32_MAX};
+ uint32_t _parallelBuckets {1};
+ mbus::Error _autoReplyError;
+};
+
+}
+
+class VisitorTest : public CppUnit::TestFixture
+{
+private:
+ CPPUNIT_TEST_SUITE(VisitorTest);
+ CPPUNIT_TEST(testNormalUsage);
+ CPPUNIT_TEST(testFailedCreateIterator);
+ CPPUNIT_TEST(testFailedGetIter);
+ CPPUNIT_TEST(testMultipleFailedGetIter);
+ CPPUNIT_TEST(testDocumentAPIClientError);
+ CPPUNIT_TEST(testNoDocumentAPIResendingForFailedVisitor);
+ CPPUNIT_TEST(testIteratorCreatedForFailedVisitor);
+ CPPUNIT_TEST(testFailedDocumentAPISend);
+ CPPUNIT_TEST(testNoVisitorNotificationForTransientFailures);
+ CPPUNIT_TEST(testNotificationSentIfTransientErrorRetriedManyTimes);
+ CPPUNIT_TEST(testNoMbusTracingIfTraceLevelIsZero);
+ CPPUNIT_TEST(testReplyContainsTraceIfTraceLevelAboveZero);
+ CPPUNIT_TEST(testNoMoreIteratorsSentWhileMemoryUsedAboveLimit);
+ CPPUNIT_TEST(testDumpVisitorInvokesStrongReadConsistencyIteration);
+ CPPUNIT_TEST(testTestVisitorInvokesWeakReadConsistencyIteration);
+ CPPUNIT_TEST_SUITE_END();
+
+ static uint32_t docCount;
+ std::vector<document::Document::SP > _documents;
+ std::unique_ptr<TestVisitorMessageSessionFactory> _messageSessionFactory;
+ std::unique_ptr<TestServiceLayerApp> _node;
+ std::unique_ptr<DummyStorageLink> _top;
+ DummyStorageLink* _bottom;
+ VisitorManager* _manager;
+
+public:
+ VisitorTest() : _node() {}
+
+ void testNormalUsage();
+ void testFailedCreateIterator();
+ void testFailedGetIter();
+ void testMultipleFailedGetIter();
+ void testDocumentAPIClientError();
+ void testNoDocumentAPIResendingForFailedVisitor();
+ void testIteratorCreatedForFailedVisitor();
+ void testFailedDocumentAPISend();
+ void testNoVisitorNotificationForTransientFailures();
+ void testNotificationSentIfTransientErrorRetriedManyTimes();
+ void testNoMbusTracingIfTraceLevelIsZero();
+ void testReplyContainsTraceIfTraceLevelAboveZero();
+ void testNoMoreIteratorsSentWhileMemoryUsedAboveLimit();
+ void testDumpVisitorInvokesStrongReadConsistencyIteration();
+ void testTestVisitorInvokesWeakReadConsistencyIteration();
+ // TODO:
+ void testVisitMultipleBuckets() {}
+
+ // Not using setUp since can't throw exception out of it.
+ void initializeTest(const TestParams& params = TestParams());
+
+ struct VisitorOptions {
+ std::string visitorType{"dumpvisitor"};
+
+ VisitorOptions() {}
+
+ VisitorOptions& withVisitorType(vespalib::stringref type) {
+ visitorType = type;
+ return *this;
+ }
+ };
+
+ std::shared_ptr<api::CreateVisitorCommand> makeCreateVisitor(
+ const VisitorOptions& options = VisitorOptions());
+ void tearDown();
+ bool waitUntilNoActiveVisitors();
+ TestVisitorMessageSession& getSession(uint32_t n);
+ uint64_t verifyCreateVisitorReply(
+ api::ReturnCode::Result expectedResult,
+ int checkStatsDocsVisited = -1,
+ int checkStatsBytesVisited = -1);
+ void getMessagesAndReply(
+ int expectedCount,
+ TestVisitorMessageSession& session,
+ std::vector<document::Document::SP >& docs,
+ std::vector<document::DocumentId>& docIds,
+ std::vector<std::string>& infoMessages,
+ api::ReturnCode::Result returnCode = api::ReturnCode::OK);
+ uint32_t getMatchingDocuments(std::vector<document::Document::SP >& docs);
+
+private:
+ void doTestVisitorInstanceHasConsistencyLevel(
+ vespalib::stringref visitorType,
+ spi::ReadConsistency expectedConsistency);
+
+ template <typename T>
+ std::vector<std::shared_ptr<T> >
+ fetchMultipleCommands(DummyStorageLink& link, size_t count);
+
+ template <typename T>
+ std::shared_ptr<T>
+ fetchSingleCommand(DummyStorageLink& link);
+
+ void sendGetIterReply(GetIterCommand& cmd,
+ const api::ReturnCode& result =
+ api::ReturnCode(api::ReturnCode::OK),
+ uint32_t maxDocuments = 0,
+ bool overrideCompleted = false);
+ void sendCreateIteratorReply(uint64_t iteratorId = 1234);
+ std::shared_ptr<api::CreateVisitorReply> doCompleteVisitingSession(
+ const std::shared_ptr<api::CreateVisitorCommand>& cmd);
+
+ void sendInitialCreateVisitorAndGetIterRound();
+
+ int64_t getFailedVisitorDestinationReplyCount() const {
+ // There's no metric manager attached to these tests, so even if the
+ // test should magically freeze here for 5+ minutes, nothing should
+ // come in and wipe our accumulated failure metrics.
+ // Only 1 visitor thread running, so we know it has the metrics.
+ const auto& metrics = _manager->getThread(0).getMetrics();
+ auto loadType = documentapi::LoadType::DEFAULT;
+ return metrics.visitorDestinationFailureReplies[loadType].getCount();
+ }
+};
+
+uint32_t VisitorTest::docCount = 10;
+
+CPPUNIT_TEST_SUITE_REGISTRATION(VisitorTest);
+
+void
+VisitorTest::initializeTest(const TestParams& params)
+{
+ LOG(debug, "Initializing test");
+ vdstestlib::DirConfig config(getStandardConfig(true));
+ config.getConfig("stor-visitor").set("visitorthreads", "1");
+ config.getConfig("stor-visitor").set(
+ "iterators_per_bucket",
+ std::to_string(params._iteratorsPerBucket));
+ config.getConfig("stor-visitor").set(
+ "defaultparalleliterators",
+ std::to_string(params._parallelBuckets));
+ config.getConfig("stor-visitor").set(
+ "visitor_memory_usage_limit",
+ std::to_string(params._maxVisitorMemoryUsage));
+
+ system("chmod 755 vdsroot 2>/dev/null");
+ system("rm -rf vdsroot* 2>/dev/null");
+ assert(system("mkdir -p vdsroot/disks/d0") == 0);
+ assert(system("mkdir -p vdsroot/disks/d1") == 0);
+
+ try {
+ _messageSessionFactory.reset(
+ new TestVisitorMessageSessionFactory(config.getConfigId()));
+ if (params._autoReplyError.getCode() != mbus::ErrorCode::NONE) {
+ _messageSessionFactory->_autoReplyError = params._autoReplyError;
+ _messageSessionFactory->_createAutoReplyVisitorSessions = true;
+ }
+ _node.reset(new TestServiceLayerApp(config.getConfigId()));
+ _top.reset(new DummyStorageLink());
+ _top->push_back(std::unique_ptr<StorageLink>(_manager
+ = new VisitorManager(
+ config.getConfigId(),
+ _node->getComponentRegister(), *_messageSessionFactory)));
+ _bottom = new DummyStorageLink();
+ _top->push_back(std::unique_ptr<StorageLink>(_bottom));
+ _manager->setTimeBetweenTicks(10);
+ _top->open();
+ } catch (config::InvalidConfigException& e) {
+ fprintf(stderr, "%s\n", e.what());
+ }
+ std::string content(
+ "To be, or not to be: that is the question:\n"
+ "Whether 'tis nobler in the mind to suffer\n"
+ "The slings and arrows of outrageous fortune,\n"
+ "Or to take arms against a sea of troubles,\n"
+ "And by opposing end them? To die: to sleep;\n"
+ "No more; and by a sleep to say we end\n"
+ "The heart-ache and the thousand natural shocks\n"
+ "That flesh is heir to, 'tis a consummation\n"
+ "Devoutly to be wish'd. To die, to sleep;\n"
+ "To sleep: perchance to dream: ay, there's the rub;\n"
+ "For in that sleep of death what dreams may come\n"
+ "When we have shuffled off this mortal coil,\n"
+ "Must give us pause: there's the respect\n"
+ "That makes calamity of so long life;\n"
+ "For who would bear the whips and scorns of time,\n"
+ "The oppressor's wrong, the proud man's contumely,\n"
+ "The pangs of despised love, the law's delay,\n"
+ "The insolence of office and the spurns\n"
+ "That patient merit of the unworthy takes,\n"
+ "When he himself might his quietus make\n"
+ "With a bare bodkin? who would fardels bear,\n"
+ "To grunt and sweat under a weary life,\n"
+ "But that the dread of something after death,\n"
+ "The undiscover'd country from whose bourn\n"
+ "No traveller returns, puzzles the will\n"
+ "And makes us rather bear those ills we have\n"
+ "Than fly to others that we know not of?\n"
+ "Thus conscience does make cowards of us all;\n"
+ "And thus the native hue of resolution\n"
+ "Is sicklied o'er with the pale cast of thought,\n"
+ "And enterprises of great pith and moment\n"
+ "With this regard their currents turn awry,\n"
+ "And lose the name of action. - Soft you now!\n"
+ "The fair Ophelia! Nymph, in thy orisons\n"
+ "Be all my sins remember'd.\n");
+ _documents.clear();
+ for (uint32_t i=0; i<docCount; ++i) {
+ std::ostringstream uri;
+ uri << "userdoc:test:" << i % 10 << ":http://www.ntnu.no/"
+ << i << ".html";
+
+ _documents.push_back(document::Document::SP(
+ _node->getTestDocMan().createDocument(content, uri.str())));
+ const document::DocumentType& type(_documents.back()->getType());
+ _documents.back()->setValue(type.getField("headerval"),
+ document::IntFieldValue(i % 4));
+ }
+ LOG(debug, "Done initializing test");
+}
+
+void
+VisitorTest::tearDown()
+{
+ if (_top.get() != 0) {
+ _top->close();
+ _top->flush();
+ _top.reset(0);
+ }
+ _node.reset(0);
+ _messageSessionFactory.reset(0);
+ _manager = 0;
+}
+
+bool
+VisitorTest::waitUntilNoActiveVisitors()
+{
+ int i = 0;
+ for (; i < 1000; ++i) {
+ if (_manager->getActiveVisitorCount() == 0) {
+ return true;
+ }
+ std::this_thread::sleep_for(10ms);
+ }
+ return false;
+}
+
+TestVisitorMessageSession&
+VisitorTest::getSession(uint32_t n)
+{
+ // Wait until we have started the visitor
+ const std::vector<TestVisitorMessageSession*>& sessions(
+ _messageSessionFactory->_visitorSessions);
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(30 * 1000));
+ while (true) {
+ {
+ vespalib::LockGuard lock(_messageSessionFactory->_accessLock);
+ if (sessions.size() > n) {
+ return *sessions[n];
+ }
+ }
+ if (clock.getTimeInMillis() > endTime) {
+ throw vespalib::IllegalStateException(
+ "Timed out waiting for visitor session", VESPA_STRLOC);
+ }
+ std::this_thread::sleep_for(10ms);
+ }
+ throw std::logic_error("unreachable");
+}
+
+void
+VisitorTest::getMessagesAndReply(
+ int expectedCount,
+ TestVisitorMessageSession& session,
+ std::vector<document::Document::SP >& docs,
+ std::vector<document::DocumentId>& docIds,
+ std::vector<std::string>& infoMessages,
+ api::ReturnCode::Result result)
+{
+ for (int i = 0; i < expectedCount; i++) {
+ session.waitForMessages(1);
+ mbus::Reply::UP reply;
+ {
+ vespalib::MonitorGuard guard(session.getMonitor());
+ CPPUNIT_ASSERT(!session.sentMessages.empty());
+ vespalib::LinkedPtr<documentapi::DocumentMessage> msg(
+ session.sentMessages.front());
+ CPPUNIT_ASSERT(msg->getPriority() < 16);
+
+ switch (msg->getType()) {
+ case documentapi::DocumentProtocol::MESSAGE_PUTDOCUMENT:
+ docs.push_back(
+ static_cast<documentapi::PutDocumentMessage&>(*msg)
+ .getDocument());
+ break;
+ case documentapi::DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
+ docIds.push_back(
+ static_cast<documentapi::RemoveDocumentMessage&>(*msg)
+ .getDocumentId());
+ break;
+ case documentapi::DocumentProtocol::MESSAGE_VISITORINFO:
+ infoMessages.push_back(
+ static_cast<documentapi::VisitorInfoMessage&>(*msg)
+ .getErrorMessage());
+ break;
+ default:
+ break;
+ }
+
+ reply = msg->createReply();
+ reply->swapState(*msg);
+
+ session.sentMessages.pop_front(); // Release linked ptr ref.
+ reply->setMessage(mbus::Message::UP(msg.release()));
+
+ if (result != api::ReturnCode::OK) {
+ reply->addError(mbus::Error(result, "Generic error"));
+ }
+ }
+ session.reply(std::move(reply));
+ }
+}
+
+uint64_t
+VisitorTest::verifyCreateVisitorReply(
+ api::ReturnCode::Result expectedResult,
+ int checkStatsDocsVisited,
+ int checkStatsBytesVisited)
+{
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL(1, (int)replies.size());
+
+ std::shared_ptr<api::StorageMessage> msg(replies[0]);
+
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY, msg->getType());
+
+ std::shared_ptr<api::CreateVisitorReply> reply(
+ std::dynamic_pointer_cast<api::CreateVisitorReply>(msg));
+ CPPUNIT_ASSERT(reply.get());
+ CPPUNIT_ASSERT_EQUAL(expectedResult, reply->getResult().getResult());
+
+ if (checkStatsDocsVisited >= 0) {
+ CPPUNIT_ASSERT_EQUAL(checkStatsDocsVisited,
+ int(reply->getVisitorStatistics().getDocumentsVisited()));
+ }
+ if (checkStatsBytesVisited >= 0) {
+ CPPUNIT_ASSERT_EQUAL(checkStatsBytesVisited,
+ int(reply->getVisitorStatistics().getBytesVisited()));
+ }
+
+ return reply->getMsgId();
+}
+
+uint32_t
+VisitorTest::getMatchingDocuments(std::vector<document::Document::SP >& docs) {
+ uint32_t equalCount = 0;
+ for (uint32_t i=0; i<docs.size(); ++i) {
+ for (uint32_t j=0; j<_documents.size(); ++j) {
+ if (*docs[i] == *_documents[j] &&
+ docs[i]->getId() == _documents[j]->getId())
+ {
+ equalCount++;
+ }
+ }
+ }
+
+ return equalCount;
+}
+
+void
+VisitorTest::sendGetIterReply(GetIterCommand& cmd,
+ const api::ReturnCode& result,
+ uint32_t maxDocuments,
+ bool overrideCompleted)
+{
+ GetIterReply::SP reply(new GetIterReply(cmd));
+ if (result.failed()) {
+ reply->setResult(result);
+ _bottom->sendUp(reply);
+ return;
+ }
+ assert(maxDocuments < _documents.size());
+ size_t documentCount = maxDocuments != 0 ? maxDocuments : _documents.size();
+ for (size_t i = 0; i < documentCount; ++i) {
+ reply->getEntries().push_back(
+ spi::DocEntry::LP(
+ new spi::DocEntry(
+ spi::Timestamp(1000 + i),
+ spi::NONE,
+ document::Document::UP(_documents[i]->clone()))));
+ }
+ if (documentCount == _documents.size() || overrideCompleted) {
+ reply->setCompleted();
+ }
+ _bottom->sendUp(reply);
+}
+
+template <typename T>
+std::vector<std::shared_ptr<T> >
+VisitorTest::fetchMultipleCommands(DummyStorageLink& link, size_t count)
+{
+ link.waitForMessages(count, 60);
+ std::vector<api::StorageMessage::SP> msgs(link.getCommandsOnce());
+ std::vector<std::shared_ptr<T> > fetched;
+ if (msgs.size() != count) {
+ std::ostringstream oss;
+ oss << "Expected "
+ << count
+ << " messages, got "
+ << msgs.size()
+ << ":\n";
+ for (size_t i = 0; i < msgs.size(); ++i) {
+ oss << i << ": " << *msgs[i] << "\n";
+ }
+ CPPUNIT_FAIL(oss.str());
+ }
+ for (size_t i = 0; i < count; ++i) {
+ std::shared_ptr<T> ret(std::dynamic_pointer_cast<T>(msgs[i]));
+ if (!ret) {
+ std::ostringstream oss;
+ oss << "Expected message of type "
+ << typeid(T).name()
+ << ", but got "
+ << msgs[0]->toString();
+ CPPUNIT_FAIL(oss.str());
+ }
+ fetched.push_back(ret);
+ }
+ return fetched;
+}
+
+template <typename T>
+std::shared_ptr<T>
+VisitorTest::fetchSingleCommand(DummyStorageLink& link)
+{
+ std::vector<std::shared_ptr<T> > ret(
+ fetchMultipleCommands<T>(link, 1));
+ return ret[0];
+}
+
+std::shared_ptr<api::CreateVisitorCommand>
+VisitorTest::makeCreateVisitor(const VisitorOptions& options)
+{
+ api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ new api::CreateVisitorCommand(options.visitorType, "testvis", ""));
+ cmd->addBucketToBeVisited(document::BucketId(16, 3));
+ cmd->setAddress(address);
+ cmd->setMaximumPendingReplyCount(UINT32_MAX);
+ cmd->setControlDestination("foo/bar");
+ return cmd;
+}
+
+void
+VisitorTest::sendCreateIteratorReply(uint64_t iteratorId)
+{
+ CreateIteratorCommand::SP createCmd(
+ fetchSingleCommand<CreateIteratorCommand>(*_bottom));
+ spi::IteratorId id(iteratorId);
+ api::StorageReply::SP reply(
+ new CreateIteratorReply(*createCmd, id));
+ _bottom->sendUp(reply);
+}
+
+void
+VisitorTest::testNormalUsage()
+{
+ initializeTest();
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+
+ CreateIteratorCommand::SP createCmd(
+ fetchSingleCommand<CreateIteratorCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(uint8_t(0), createCmd->getPriority()); // Highest pri
+ spi::IteratorId id(1234);
+ api::StorageReply::SP reply(
+ new CreateIteratorReply(*createCmd, id));
+ _bottom->sendUp(reply);
+
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+
+ sendGetIterReply(*getIterCmd);
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ getMessagesAndReply(_documents.size(), getSession(0), docs, docIds, infoMessages);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), docIds.size());
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+ CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount());
+}
+
+void
+VisitorTest::testFailedCreateIterator()
+{
+ initializeTest();
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ cmd->addBucketToBeVisited(document::BucketId(16, 4));
+ _top->sendDown(cmd);
+
+ CreateIteratorCommand::SP createCmd(
+ fetchSingleCommand<CreateIteratorCommand>(*_bottom));
+ spi::IteratorId id(0);
+ api::StorageReply::SP reply(
+ new CreateIteratorReply(*createCmd, id));
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE));
+ _bottom->sendUp(reply);
+
+ verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE, 0, 0);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+void
+VisitorTest::testFailedGetIter()
+{
+ initializeTest();
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+
+ sendGetIterReply(*getIterCmd,
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND));
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::BUCKET_NOT_FOUND, 0, 0);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+void
+VisitorTest::testMultipleFailedGetIter()
+{
+ initializeTest(TestParams().iteratorsPerBucket(2));
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ std::vector<GetIterCommand::SP> getIterCmds(
+ fetchMultipleCommands<GetIterCommand>(*_bottom, 2));
+
+ sendGetIterReply(*getIterCmds[0],
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND));
+
+ // Wait for an "appropriate" amount of time so that wrongful logic
+ // will send a DestroyIteratorCommand before all pending GetIters
+ // have been replied to.
+ std::this_thread::sleep_for(100ms);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands());
+
+ sendGetIterReply(*getIterCmds[1],
+ api::ReturnCode(api::ReturnCode::BUCKET_DELETED));
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::BUCKET_DELETED, 0, 0);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+void
+VisitorTest::testDocumentAPIClientError()
+{
+ initializeTest();
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ {
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+
+ sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 1);
+ }
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::INTERNAL_FAILURE);
+ // INTERNAL_FAILURE is critical, so no visitor info sent
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+
+ std::this_thread::sleep_for(100ms);
+
+ {
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+
+ sendGetIterReply(*getIterCmd);
+ }
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+void
+VisitorTest::testNoDocumentAPIResendingForFailedVisitor()
+{
+ initializeTest();
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ {
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+
+ sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 2, true);
+ }
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ // Use non-critical result. Visitor info message should be received
+ // after we send a NOT_CONNECTED reply. Failing this message as well
+ // should cause the entire visitor to fail.
+ getMessagesAndReply(3, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::NOT_CONNECTED);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), infoMessages.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("[From content node 0] NOT_CONNECTED: Generic error"),
+ infoMessages[0]);
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::NOT_CONNECTED);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+ CPPUNIT_ASSERT_EQUAL(3L, getFailedVisitorDestinationReplyCount());
+}
+
+void
+VisitorTest::testIteratorCreatedForFailedVisitor()
+{
+ initializeTest(TestParams().iteratorsPerBucket(1).parallelBuckets(2));
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ cmd->addBucketToBeVisited(document::BucketId(16, 4));
+ _top->sendDown(cmd);
+
+ std::vector<CreateIteratorCommand::SP> createCmds(
+ fetchMultipleCommands<CreateIteratorCommand>(*_bottom, 2));
+ {
+ spi::IteratorId id(0);
+ api::StorageReply::SP reply(
+ new CreateIteratorReply(*createCmds[0], id));
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE));
+ _bottom->sendUp(reply);
+ }
+ {
+ spi::IteratorId id(1234);
+ api::StorageReply::SP reply(
+ new CreateIteratorReply(*createCmds[1], id));
+ _bottom->sendUp(reply);
+ }
+ // Want to immediately receive destroyiterator for newly created
+ // iterator, since we cannot use it anyway when the visitor has failed.
+ DestroyIteratorCommand::SP destroyCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::INTERNAL_FAILURE, 0, 0);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+/**
+ * Test that if a visitor fails to send a document API message outright
+ * (i.e. a case where it will never get a reply), the session is failed
+ * and the visitor terminates cleanly without counting the failed message
+ * as pending.
+ */
+void
+VisitorTest::testFailedDocumentAPISend()
+{
+ initializeTest(TestParams().autoReplyError(
+ mbus::Error(mbus::ErrorCode::HANDSHAKE_FAILED,
+ "abandon ship!")));
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ cmd->addBucketToBeVisited(document::BucketId(16, 4));
+ _top->sendDown(cmd);
+
+ sendCreateIteratorReply();
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+ sendGetIterReply(*getIterCmd,
+ api::ReturnCode(api::ReturnCode::OK),
+ 2,
+ true);
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(
+ static_cast<api::ReturnCode::Result>(
+ mbus::ErrorCode::HANDSHAKE_FAILED),
+ 0,
+ 0);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+ // We currently don't count failures to send in this metric; send failures
+ // indicate a message bus problem and already log a warning when they happen
+ CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount());
+}
+
+void
+VisitorTest::sendInitialCreateVisitorAndGetIterRound()
+{
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ {
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK),
+ 1, true);
+ }
+}
+
+void
+VisitorTest::testNoVisitorNotificationForTransientFailures()
+{
+ initializeTest();
+ sendInitialCreateVisitorAndGetIterRound();
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ // Have to make sure time increases in visitor thread so that resend
+ // times are reached.
+ _node->getClock().setFakeCycleMode();
+ // Should not get info message for BUCKET_DELETED, but resend of Put.
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::BUCKET_DELETED);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ // Should not get info message for BUCKET_NOT_FOUND, but resend of Put.
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::BUCKET_NOT_FOUND);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ // MessageBus error codes guaranteed to fit in return code result.
+ // Should not get info message for SESSION_BUSY, but resend of Put.
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ static_cast<api::ReturnCode::Result>(
+ mbus::ErrorCode::SESSION_BUSY));
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ // WRONG_DISTRIBUTION should not be reported, as it will happen all the
+ // time when initiating remote migrations et al.
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::WRONG_DISTRIBUTION);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+
+ // Complete message successfully to finish the visitor.
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::OK);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom);
+
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+void
+VisitorTest::testNotificationSentIfTransientErrorRetriedManyTimes()
+{
+ constexpr size_t retries(
+ Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY);
+
+ initializeTest();
+ sendInitialCreateVisitorAndGetIterRound();
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ // Have to make sure time increases in visitor thread so that resend
+ // times are reached.
+ _node->getClock().setFakeCycleMode();
+ for (size_t attempt = 0; attempt < retries; ++attempt) {
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::WRONG_DISTRIBUTION);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ }
+ // Should now have a client notification along for the ride.
+ // This has to be ACKed as OK or the visitor will fail.
+ getMessagesAndReply(2, getSession(0), docs, docIds, infoMessages,
+ api::ReturnCode::OK);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), infoMessages.size());
+ // TODO(vekterli) ideally we'd want to test that this happens only once
+ // per message, but this seems frustratingly complex to do currently.
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom);
+
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+std::shared_ptr<api::CreateVisitorReply>
+VisitorTest::doCompleteVisitingSession(
+ const std::shared_ptr<api::CreateVisitorCommand>& cmd)
+{
+ initializeTest();
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ sendGetIterReply(*getIterCmd,
+ api::ReturnCode(api::ReturnCode::OK),
+ 1,
+ true);
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages);
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ _top->waitForMessages(1, 60);
+ const msg_ptr_vector replies = _top->getRepliesOnce();
+ CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size());
+
+ std::shared_ptr<api::StorageMessage> msg(replies[0]);
+
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::VISITOR_CREATE_REPLY,
+ msg->getType());
+ return std::dynamic_pointer_cast<api::CreateVisitorReply>(msg);
+}
+
+void
+VisitorTest::testNoMbusTracingIfTraceLevelIsZero()
+{
+ std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor());
+ cmd->getTrace().setLevel(0);
+ auto reply = doCompleteVisitingSession(cmd);
+ CPPUNIT_ASSERT(reply->getTrace().getRoot().isEmpty());
+}
+
+void
+VisitorTest::testReplyContainsTraceIfTraceLevelAboveZero()
+{
+ std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor());
+ cmd->getTrace().setLevel(1);
+ auto reply = doCompleteVisitingSession(cmd);
+ CPPUNIT_ASSERT(!reply->getTrace().getRoot().isEmpty());
+}
+
+void
+VisitorTest::testNoMoreIteratorsSentWhileMemoryUsedAboveLimit()
+{
+ initializeTest(TestParams().maxVisitorMemoryUsage(1)
+ .parallelBuckets(1)
+ .iteratorsPerBucket(1));
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor());
+ _top->sendDown(cmd);
+ sendCreateIteratorReply();
+
+ GetIterCommand::SP getIterCmd(
+ fetchSingleCommand<GetIterCommand>(*_bottom));
+ sendGetIterReply(*getIterCmd,
+ api::ReturnCode(api::ReturnCode::OK),
+ 1);
+
+ // Pending Document API message towards client; memory usage should prevent
+ // visitor from sending down additional GetIter messages until the pending
+ // client message has been replied to and cleared from the internal state.
+ getSession(0).waitForMessages(1);
+ // Note that it's possible for this test to exhibit false negatives (but not
+ // false positives) since the _absence_ of a message means we don't have any
+ // kind of explicit barrier with which we can synchronize the test and the
+ // running visitor thread.
+ std::this_thread::sleep_for(100ms);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands());
+
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages);
+
+ // 2nd round of GetIter now allowed. Send reply indicating completion.
+ getIterCmd = fetchSingleCommand<GetIterCommand>(*_bottom);
+ sendGetIterReply(*getIterCmd,
+ api::ReturnCode(api::ReturnCode::OK),
+ 1,
+ true);
+
+ getMessagesAndReply(1, getSession(0), docs, docIds, infoMessages);
+
+ DestroyIteratorCommand::SP destroyIterCmd(
+ fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+
+ verifyCreateVisitorReply(api::ReturnCode::OK);
+ CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+}
+
+void
+VisitorTest::doTestVisitorInstanceHasConsistencyLevel(
+ vespalib::stringref visitorType,
+ spi::ReadConsistency expectedConsistency)
+{
+ initializeTest();
+ std::shared_ptr<api::CreateVisitorCommand> cmd(
+ makeCreateVisitor(VisitorOptions().withVisitorType(visitorType)));
+ _top->sendDown(cmd);
+
+ auto createCmd = fetchSingleCommand<CreateIteratorCommand>(*_bottom);
+ CPPUNIT_ASSERT_EQUAL(expectedConsistency,
+ createCmd->getReadConsistency());
+}
+
+void
+VisitorTest::testDumpVisitorInvokesStrongReadConsistencyIteration()
+{
+ doTestVisitorInstanceHasConsistencyLevel(
+ "dumpvisitor", spi::ReadConsistency::STRONG);
+}
+
+// NOTE: SearchVisitor cannot be tested here since it's in a separate module
+// which depends on _this_ module for compilation. Instead we let TestVisitor
+// use weak consistency, as this is just some internal stuff not used for/by
+// any external client use cases. Our primary concern is to test that each
+// visitor subclass might report its own read consistency requirement and that
+// this is carried along to the CreateIteratorCommand.
+void
+VisitorTest::testTestVisitorInvokesWeakReadConsistencyIteration()
+{
+ doTestVisitorInstanceHasConsistencyLevel(
+ "testvisitor", spi::ReadConsistency::WEAK);
+}
+
+} // namespace storage