aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/storageserver
Publish
Diffstat (limited to 'storage/src/tests/storageserver')
-rw-r--r--storage/src/tests/storageserver/.gitignore13
-rw-r--r--storage/src/tests/storageserver/CMakeLists.txt17
-rw-r--r--storage/src/tests/storageserver/bouncertest.cpp285
-rw-r--r--storage/src/tests/storageserver/bucketintegritycheckertest.cpp302
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp648
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp235
-rw-r--r--storage/src/tests/storageserver/documentapiconvertertest.cpp529
-rw-r--r--storage/src/tests/storageserver/dummystoragelink.cpp182
-rw-r--r--storage/src/tests/storageserver/dummystoragelink.h115
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp1566
-rw-r--r--storage/src/tests/storageserver/priorityconvertertest.cpp104
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp264
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp279
-rw-r--r--storage/src/tests/storageserver/testvisitormessagesession.cpp78
-rw-r--r--storage/src/tests/storageserver/testvisitormessagesession.h79
15 files changed, 4696 insertions, 0 deletions
diff --git a/storage/src/tests/storageserver/.gitignore b/storage/src/tests/storageserver/.gitignore
new file mode 100644
index 00000000000..c4098089f09
--- /dev/null
+++ b/storage/src/tests/storageserver/.gitignore
@@ -0,0 +1,13 @@
+*.So
+*.lo
+*.o
+.*.swp
+.config.log
+.depend
+.depend.NEW
+.deps
+.libs
+Makefile
+filestorage
+testrunner
+testrunner.core
diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt
new file mode 100644
index 00000000000..2e327089b4c
--- /dev/null
+++ b/storage/src/tests/storageserver/CMakeLists.txt
@@ -0,0 +1,17 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(storage_teststorageserver
+ SOURCES
+ communicationmanagertest.cpp
+ statemanagertest.cpp
+ documentapiconvertertest.cpp
+ mergethrottlertest.cpp
+ testvisitormessagesession.cpp
+ bouncertest.cpp
+ bucketintegritycheckertest.cpp
+ priorityconvertertest.cpp
+ statereportertest.cpp
+ changedbucketownershiphandlertest.cpp
+ DEPENDS
+ AFTER
+ storage_storageconfig
+)
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp
new file mode 100644
index 00000000000..f00e4b19c31
--- /dev/null
+++ b/storage/src/tests/storageserver/bouncertest.cpp
@@ -0,0 +1,285 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <boost/pointer_cast.hpp>
+#include <cppunit/extensions/HelperMacros.h>
+#include <iostream>
+#include <string>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storageapi/message/stat.h>
+#include <vespa/vdslib/state/nodestate.h>
+#include <vespa/storage/storageserver/bouncer.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/dummystoragelink.h>
+#include <vespa/storageapi/message/persistence.h>
+
+namespace storage {
+
+struct BouncerTest : public CppUnit::TestFixture {
+ std::unique_ptr<TestServiceLayerApp> _node;
+ std::unique_ptr<DummyStorageLink> _upper;
+ Bouncer* _manager;
+ DummyStorageLink* _lower;
+
+ BouncerTest();
+
+ void setUp();
+ void tearDown();
+
+ void testFutureTimestamp();
+ void testAllowNotifyBucketChangeEvenWhenDistributorDown();
+ void rejectLowerPrioritizedFeedMessagesWhenConfigured();
+ void doNotRejectHigherPrioritizedFeedMessagesThanConfigured();
+ void rejectionThresholdIsExclusive();
+ void onlyRejectFeedMessagesWhenConfigured();
+ void rejectionIsDisabledByDefaultInConfig();
+ void readOnlyOperationsAreNotRejected();
+ void internalOperationsAreNotRejected();
+ void outOfBoundsConfigValuesThrowException();
+
+ CPPUNIT_TEST_SUITE(BouncerTest);
+ CPPUNIT_TEST(testFutureTimestamp);
+ CPPUNIT_TEST(testAllowNotifyBucketChangeEvenWhenDistributorDown);
+ CPPUNIT_TEST(rejectLowerPrioritizedFeedMessagesWhenConfigured);
+ CPPUNIT_TEST(doNotRejectHigherPrioritizedFeedMessagesThanConfigured);
+ CPPUNIT_TEST(rejectionThresholdIsExclusive);
+ CPPUNIT_TEST(onlyRejectFeedMessagesWhenConfigured);
+ CPPUNIT_TEST(rejectionIsDisabledByDefaultInConfig);
+ CPPUNIT_TEST(readOnlyOperationsAreNotRejected);
+ CPPUNIT_TEST(internalOperationsAreNotRejected);
+ CPPUNIT_TEST(outOfBoundsConfigValuesThrowException);
+ CPPUNIT_TEST_SUITE_END();
+
+ using Priority = api::StorageMessage::Priority;
+
+ static constexpr int RejectionDisabledConfigValue = -1;
+
+ // Note: newThreshold is intentionally int (rather than Priority) in order
+ // to be able to test out of bounds values.
+ void configureRejectionThreshold(int newThreshold);
+
+ std::shared_ptr<api::StorageCommand> createDummyFeedMessage(
+ api::Timestamp timestamp,
+ Priority priority = 0);
+
+ void assertMessageBouncedWithRejection();
+ void assertMessageNotBounced();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(BouncerTest);
+
+BouncerTest::BouncerTest()
+ : _node(),
+ _upper(),
+ _manager(0),
+ _lower(0)
+{
+}
+
+void
+BouncerTest::setUp() {
+ try{
+ vdstestlib::DirConfig config(getStandardConfig(true));
+ _node.reset(new TestServiceLayerApp(
+ DiskCount(1), NodeIndex(2), config.getConfigId()));
+ _upper.reset(new DummyStorageLink());
+ _manager = new Bouncer(_node->getComponentRegister(),
+ config.getConfigId());
+ _lower = new DummyStorageLink();
+ _upper->push_back(std::unique_ptr<StorageLink>(_manager));
+ _upper->push_back(std::unique_ptr<StorageLink>(_lower));
+ _upper->open();
+ } catch (std::exception& e) {
+ std::cerr << "Failed to static initialize objects: " << e.what()
+ << "\n";
+ }
+ _node->getClock().setAbsoluteTimeInSeconds(10);
+}
+
+void
+BouncerTest::tearDown() {
+ _manager = 0;
+ _lower = 0;
+ _upper->close();
+ _upper->flush();
+ _upper.reset(0);
+ _node.reset(0);
+}
+
+std::shared_ptr<api::StorageCommand>
+BouncerTest::createDummyFeedMessage(api::Timestamp timestamp,
+ api::StorageMessage::Priority priority)
+{
+ auto cmd = std::make_shared<api::RemoveCommand>(
+ document::BucketId(0),
+ document::DocumentId("doc:foo:bar"),
+ timestamp);
+ cmd->setPriority(priority);
+ return cmd;
+}
+
+void
+BouncerTest::testFutureTimestamp()
+{
+
+ // Fail when future timestamps (more than 5 seconds) are received.
+ {
+ _upper->sendDown(createDummyFeedMessage(16 * 1000000));
+
+ CPPUNIT_ASSERT_EQUAL(1, (int)_upper->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(0, (int)_upper->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
+ static_cast<api::RemoveReply&>(*_upper->getReply(0)).
+ getResult().getResult());
+ _upper->reset();
+ }
+
+ // Verify that 1 second clock skew is OK
+ {
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000));
+
+ CPPUNIT_ASSERT_EQUAL(0, (int)_upper->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(1, (int)_lower->getNumCommands());
+ _lower->reset();
+ }
+
+ // Verify that past is OK
+ {
+ _upper->sendDown(createDummyFeedMessage(5 * 1000000));
+
+ CPPUNIT_ASSERT_EQUAL(1, (int)_lower->getNumCommands());
+ }
+
+
+}
+
+void
+BouncerTest::testAllowNotifyBucketChangeEvenWhenDistributorDown()
+{
+ lib::NodeState state(lib::NodeType::DISTRIBUTOR, lib::State::DOWN);
+ _node->getNodeStateUpdater().setReportedNodeState(state);
+ // Trigger Bouncer state update
+ auto clusterState = std::make_shared<lib::ClusterState>(
+ "distributor:3 storage:3");
+ _node->getNodeStateUpdater().setClusterState(clusterState);
+
+
+ document::BucketId bucket(16, 1234);
+ api::BucketInfo info(0x1, 0x2, 0x3);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, info);
+ _upper->sendDown(cmd);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _lower->getNumCommands());
+}
+
+void
+BouncerTest::assertMessageBouncedWithRejection()
+{
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::REJECTED,
+ static_cast<api::RemoveReply&>(*_upper->getReply(0)).
+ getResult().getResult());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumCommands());
+}
+
+void
+BouncerTest::assertMessageNotBounced()
+{
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _lower->getNumCommands());
+}
+
+void
+BouncerTest::configureRejectionThreshold(int newThreshold)
+{
+ using Builder = vespa::config::content::core::StorBouncerConfigBuilder;
+ auto config = std::make_unique<Builder>();
+ config->feedRejectionPriorityThreshold = newThreshold;
+ _manager->configure(std::move(config));
+}
+
+void
+BouncerTest::rejectLowerPrioritizedFeedMessagesWhenConfigured()
+{
+ configureRejectionThreshold(Priority(120));
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(121)));
+ assertMessageBouncedWithRejection();
+}
+
+void
+BouncerTest::doNotRejectHigherPrioritizedFeedMessagesThanConfigured()
+{
+ configureRejectionThreshold(Priority(120));
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(119)));
+ assertMessageNotBounced();
+}
+
+void
+BouncerTest::rejectionThresholdIsExclusive()
+{
+ configureRejectionThreshold(Priority(120));
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(120)));
+ assertMessageNotBounced();
+}
+
+void
+BouncerTest::onlyRejectFeedMessagesWhenConfigured()
+{
+ configureRejectionThreshold(RejectionDisabledConfigValue);
+ // A message with even the lowest priority should not be rejected.
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(255)));
+ assertMessageNotBounced();
+}
+
+void
+BouncerTest::rejectionIsDisabledByDefaultInConfig()
+{
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(255)));
+ assertMessageNotBounced();
+}
+
+void
+BouncerTest::readOnlyOperationsAreNotRejected()
+{
+ configureRejectionThreshold(Priority(1));
+ // StatBucket is an external operation, but it's not a mutating operation
+ // and should therefore not be blocked.
+ auto cmd = std::make_shared<api::StatBucketCommand>(
+ document::BucketId(16, 5), "");
+ cmd->setPriority(Priority(2));
+ _upper->sendDown(cmd);
+ assertMessageNotBounced();
+}
+
+void
+BouncerTest::internalOperationsAreNotRejected()
+{
+ configureRejectionThreshold(Priority(1));
+ document::BucketId bucket(16, 1234);
+ api::BucketInfo info(0x1, 0x2, 0x3);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, info);
+ cmd->setPriority(Priority(2));
+ _upper->sendDown(cmd);
+ assertMessageNotBounced();
+}
+
+void
+BouncerTest::outOfBoundsConfigValuesThrowException()
+{
+ try {
+ configureRejectionThreshold(256);
+ CPPUNIT_FAIL("Upper bound violation not caught");
+ } catch (config::InvalidConfigException) {}
+
+ try {
+ configureRejectionThreshold(-2);
+ CPPUNIT_FAIL("Lower bound violation not caught");
+ } catch (config::InvalidConfigException) {}
+}
+
+} // storage
+
diff --git a/storage/src/tests/storageserver/bucketintegritycheckertest.cpp b/storage/src/tests/storageserver/bucketintegritycheckertest.cpp
new file mode 100644
index 00000000000..88a5546b174
--- /dev/null
+++ b/storage/src/tests/storageserver/bucketintegritycheckertest.cpp
@@ -0,0 +1,302 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <boost/lexical_cast.hpp>
+#include <cppunit/extensions/HelperMacros.h>
+#include <vespa/log/log.h>
+#include <vespa/storage/bucketdb/bucketmanager.h>
+#include <vespa/storage/bucketdb/storbucketdb.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/storageserver/bucketintegritychecker.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/storagelinktest.h>
+#include <tests/common/dummystoragelink.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <tests/common/teststorageapp.h>
+
+LOG_SETUP(".test.bucketintegritychecker");
+
+namespace storage {
+
+struct BucketIntegrityCheckerTest : public CppUnit::TestFixture {
+ std::unique_ptr<vdstestlib::DirConfig> _config;
+ std::unique_ptr<TestServiceLayerApp> _node;
+ int _timeout; // Timeout in seconds before aborting
+
+ void setUp() {
+ _timeout = 60*2;
+ _config.reset(new vdstestlib::DirConfig(getStandardConfig(true)));
+ _node.reset(new TestServiceLayerApp(DiskCount(256),
+ NodeIndex(0),
+ _config->getConfigId()));
+ }
+
+ void tearDown() {
+ LOG(info, "Finished test");
+ }
+
+ void testConfig();
+ void testBasicFunctionality();
+ void testTiming();
+
+ CPPUNIT_TEST_SUITE(BucketIntegrityCheckerTest);
+ CPPUNIT_TEST(testConfig);
+ CPPUNIT_TEST(testBasicFunctionality);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(BucketIntegrityCheckerTest);
+
+void BucketIntegrityCheckerTest::testConfig()
+{
+
+ // Verify that config is read correctly. Given config should not use
+ // any default values.
+ vdstestlib::DirConfig::Config& config(
+ _config->getConfig("stor-integritychecker"));
+ config.set("dailycyclestart", "60");
+ config.set("dailycyclestop", "360");
+ config.set("weeklycycle", "crRc-rc");
+ config.set("maxpending", "2");
+ config.set("mincycletime", "120");
+ config.set("requestdelay", "5");
+
+ BucketIntegrityChecker checker(_config->getConfigId(),
+ _node->getComponentRegister());
+ checker.setMaxThreadWaitTime(framework::MilliSecTime(10));
+ SchedulingOptions& opt(checker.getSchedulingOptions());
+ CPPUNIT_ASSERT_EQUAL(60u, opt._dailyCycleStart);
+ CPPUNIT_ASSERT_EQUAL(360u, opt._dailyCycleStop);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::CONTINUE, opt._dailyStates[0]);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::RUN_CHEAP, opt._dailyStates[1]);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::RUN_FULL, opt._dailyStates[2]);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::CONTINUE, opt._dailyStates[3]);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::DONT_RUN, opt._dailyStates[4]);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::RUN_CHEAP, opt._dailyStates[5]);
+ CPPUNIT_ASSERT_EQUAL(SchedulingOptions::CONTINUE, opt._dailyStates[6]);
+ CPPUNIT_ASSERT_EQUAL(2u, opt._maxPendingCount);
+ CPPUNIT_ASSERT_EQUAL(framework::SecondTime(7200), opt._minCycleTime);
+ CPPUNIT_ASSERT_EQUAL(framework::SecondTime(5), opt._requestDelay);
+}
+
+namespace {
+ /**
+ * Calculate a date based on the following format:
+ * week<#> <day> <hh>:<mm>:<ss>
+ * Examples: "week3 mon 00:30:00"
+ * "week3 tue 04:20:00"
+ * "week9 thi 14:00:24"
+ */
+ time_t getDate(const std::string& datestring) {
+ vespalib::string rest(datestring);
+ int spacePos = rest.find(' ');
+ uint32_t week = strtoul(rest.substr(4, spacePos-4).c_str(), NULL, 0);
+ rest = rest.substr(spacePos+1);
+ vespalib::string wday(rest.substr(0,3));
+ rest = rest.substr(4);
+ uint32_t hours = strtoul(rest.substr(0, 2).c_str(), NULL, 0);
+ uint32_t minutes = strtoul(rest.substr(3, 2).c_str(), NULL, 0);
+ uint32_t seconds = strtoul(rest.substr(6, 2).c_str(), NULL, 0);
+ uint32_t day(0);
+ if (wday == "mon") { day = 1; }
+ else if (wday == "tue") { day = 2; }
+ else if (wday == "wed") { day = 3; }
+ else if (wday == "thi") { day = 4; }
+ else if (wday == "fri") { day = 5; }
+ else if (wday == "sat") { day = 6; }
+ else if (wday == "sun") { day = 0; }
+ else { assert(false); }
+ // Create a start time that points to the start of some week.
+ // A random sunday 00:00:00, which we will use as start of time
+ struct tm mytime;
+ memset(&mytime, 0, sizeof(mytime));
+ mytime.tm_year = 2008 - 1900;
+ mytime.tm_mon = 0;
+ mytime.tm_mday = 1;
+ mytime.tm_hour = 0;
+ mytime.tm_min = 0;
+ mytime.tm_sec = 0;
+ time_t startTime = timegm(&mytime);
+ CPPUNIT_ASSERT(gmtime_r(&startTime, &mytime));
+ while (mytime.tm_wday != 0) {
+ ++mytime.tm_mday;
+ startTime = timegm(&mytime);
+ CPPUNIT_ASSERT(gmtime_r(&startTime, &mytime));
+ }
+ // Add the wanted values to the start time
+ time_t resultTime = startTime;
+ resultTime += week * 7 * 24 * 60 * 60
+ + day * 24 * 60 * 60
+ + hours * 60 * 60
+ + minutes * 60
+ + seconds;
+ // std::cerr << "Time requested " << datestring << ". Got time "
+ // << framework::SecondTime(resultTime).toString() << "\n";
+ return resultTime;
+ }
+
+ void addBucketToDatabase(TestServiceLayerApp& server,
+ const document::BucketId& id, uint8_t disk,
+ uint32_t numDocs, uint32_t crc, uint32_t totalSize)
+ {
+ bucketdb::StorageBucketInfo info;
+ info.setBucketInfo(api::BucketInfo(crc, numDocs, totalSize));
+ info.disk = disk;
+ server.getStorageBucketDatabase().insert(id, info, "foo");
+ }
+
+
+ /**
+ * In tests wanting to only have one pending, only add buckets for one disk
+ * as pending is per disk. If so set singleDisk true.
+ */
+ void addBucketsToDatabase(TestServiceLayerApp& server, bool singleDisk) {
+ addBucketToDatabase(server, document::BucketId(16, 0x123), 0,
+ 14, 0x123, 1024);
+ addBucketToDatabase(server, document::BucketId(16, 0x234), 0,
+ 18, 0x234, 1024);
+ addBucketToDatabase(server, document::BucketId(16, 0x345), 0,
+ 11, 0x345, 2048);
+ addBucketToDatabase(server, document::BucketId(16, 0x456), 0,
+ 13, 0x456, 1280);
+ if (!singleDisk) {
+ addBucketToDatabase(server, document::BucketId(16, 0x567), 1,
+ 20, 0x567, 4096);
+ addBucketToDatabase(server, document::BucketId(16, 0x987), 254,
+ 8, 0x987, 65536);
+ }
+ }
+}
+
+void BucketIntegrityCheckerTest::testBasicFunctionality()
+{
+ _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 00:00:00"));
+ addBucketsToDatabase(*_node, false);
+ DummyStorageLink* dummyLink = 0;
+ {
+ std::unique_ptr<BucketIntegrityChecker> midLink(
+ new BucketIntegrityChecker("", _node->getComponentRegister()));
+ BucketIntegrityChecker& checker(*midLink);
+ checker.setMaxThreadWaitTime(framework::MilliSecTime(10));
+ // Setup and start checker
+ DummyStorageLink topLink;
+ topLink.push_back(StorageLink::UP(midLink.release()));
+ checker.push_back(std::unique_ptr<StorageLink>(
+ dummyLink = new DummyStorageLink()));
+ checker.getSchedulingOptions()._maxPendingCount = 2;
+ checker.getSchedulingOptions()._minCycleTime = framework::SecondTime(60 * 60);
+ topLink.open();
+ // Waiting for system to be initialized
+ FastOS_Thread::Sleep(10); // Give next message chance to come
+ ASSERT_COMMAND_COUNT(0, *dummyLink);
+ topLink.doneInit();
+ checker.bump();
+ // Should have started new run with 2 pending per disk
+ dummyLink->waitForMessages(4, _timeout);
+ FastOS_Thread::Sleep(10); // Give 5th message chance to come
+ ASSERT_COMMAND_COUNT(4, *dummyLink);
+ RepairBucketCommand *cmd1 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(0).get());
+ CPPUNIT_ASSERT_EQUAL(230, (int)cmd1->getPriority());
+ CPPUNIT_ASSERT(cmd1);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x234),
+ cmd1->getBucketId());
+ RepairBucketCommand *cmd2 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(1).get());
+ CPPUNIT_ASSERT(cmd2);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x456),
+ cmd2->getBucketId());
+ RepairBucketCommand *cmd3 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(2).get());
+ CPPUNIT_ASSERT(cmd3);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x567),
+ cmd3->getBucketId());
+ RepairBucketCommand *cmd4 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(3).get());
+ CPPUNIT_ASSERT(cmd4);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x987),
+ cmd4->getBucketId());
+
+ // Answering a message on disk with no more buckets does not trigger new
+ std::shared_ptr<RepairBucketReply> reply1(
+ new RepairBucketReply(*cmd3));
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply1));
+ FastOS_Thread::Sleep(10); // Give next message chance to come
+ ASSERT_COMMAND_COUNT(4, *dummyLink);
+ // Answering a message on disk with more buckets trigger new repair
+ std::shared_ptr<RepairBucketReply> reply2(
+ new RepairBucketReply(*cmd2));
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply2));
+ dummyLink->waitForMessages(5, _timeout);
+ FastOS_Thread::Sleep(10); // Give 6th message chance to come
+ ASSERT_COMMAND_COUNT(5, *dummyLink);
+ RepairBucketCommand *cmd5 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(4).get());
+ CPPUNIT_ASSERT(cmd5);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x345),
+ cmd5->getBucketId());
+ // Fail a repair, causing it to be resent later, but first continue
+ // with other bucket.
+ std::shared_ptr<RepairBucketReply> reply3(
+ new RepairBucketReply(*cmd1));
+ reply3->setResult(api::ReturnCode(api::ReturnCode::IGNORED));
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply3));
+ dummyLink->waitForMessages(6, _timeout);
+ FastOS_Thread::Sleep(10); // Give 7th message chance to come
+ ASSERT_COMMAND_COUNT(6, *dummyLink);
+ RepairBucketCommand *cmd6 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(5).get());
+ CPPUNIT_ASSERT(cmd6);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x123),
+ cmd6->getBucketId());
+ // Fail a repair with not found. That is an acceptable return code.
+ // (No more requests as this was last for that disk)
+ std::shared_ptr<RepairBucketReply> reply4(
+ new RepairBucketReply(*cmd4));
+ reply3->setResult(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND));
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply4));
+ FastOS_Thread::Sleep(10); // Give 7th message chance to come
+ ASSERT_COMMAND_COUNT(6, *dummyLink);
+
+ // Send a repair reply that actually have corrected the bucket.
+ api::BucketInfo newInfo(0x3456, 4, 8192);
+ std::shared_ptr<RepairBucketReply> reply5(
+ new RepairBucketReply(*cmd5, newInfo));
+ reply5->setAltered(true);
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply5));
+
+ // Finish run. New iteration should not start yet as min
+ // cycle time has not passed
+ std::shared_ptr<RepairBucketReply> reply6(
+ new RepairBucketReply(*cmd6));
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply6));
+ dummyLink->waitForMessages(7, _timeout);
+ ASSERT_COMMAND_COUNT(7, *dummyLink);
+ RepairBucketCommand *cmd7 = dynamic_cast<RepairBucketCommand*>(
+ dummyLink->getCommand(6).get());
+ CPPUNIT_ASSERT(cmd7);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x234),
+ cmd7->getBucketId());
+ std::shared_ptr<RepairBucketReply> reply7(
+ new RepairBucketReply(*cmd7));
+ CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply7));
+ FastOS_Thread::Sleep(10); // Give 8th message chance to come
+ ASSERT_COMMAND_COUNT(7, *dummyLink);
+
+ // Still not time for next iteration
+ dummyLink->reset();
+ _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 00:59:59"));
+ FastOS_Thread::Sleep(10); // Give new run chance to start
+ ASSERT_COMMAND_COUNT(0, *dummyLink);
+
+ // Pass time until next cycle should start
+ dummyLink->reset();
+ _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 01:00:00"));
+ dummyLink->waitForMessages(4, _timeout);
+ ASSERT_COMMAND_COUNT(4, *dummyLink);
+ }
+}
+
+} // storage
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
new file mode 100644
index 00000000000..3b83d71d8f3
--- /dev/null
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -0,0 +1,648 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/document/base/testdocman.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/storage/bucketdb/storbucketdb.h>
+#include <vespa/storage/persistence/messages.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/multioperation.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/dummystoragelink.h>
+#include <vespa/storage/storageserver/changedbucketownershiphandler.h>
+#include <memory>
+
+namespace storage {
+
+class ChangedBucketOwnershipHandlerTest : public CppUnit::TestFixture
+{
+ std::unique_ptr<TestServiceLayerApp> _app;
+ std::unique_ptr<DummyStorageLink> _top;
+ ChangedBucketOwnershipHandler* _handler;
+ DummyStorageLink* _bottom;
+ document::TestDocMan _testDocRepo;
+
+ CPPUNIT_TEST_SUITE(ChangedBucketOwnershipHandlerTest);
+ CPPUNIT_TEST(testEnumerateBucketsBelongingOnChangedNodes);
+ CPPUNIT_TEST(testNoPreExistingClusterState);
+ CPPUNIT_TEST(testNoAvailableDistributorsInCurrentState);
+ CPPUNIT_TEST(testNoAvailableDistributorsInCurrentAndNewState);
+ CPPUNIT_TEST(testDownEdgeToNoAvailableDistributors);
+ CPPUNIT_TEST(testOwnershipChangedOnDistributorUpEdge);
+ CPPUNIT_TEST(testDistributionConfigChangeUpdatesOwnership);
+ CPPUNIT_TEST(testAbortOpsWhenNoClusterStateSet);
+ CPPUNIT_TEST(testAbortOutdatedSplit);
+ CPPUNIT_TEST(testAbortOutdatedJoin);
+ CPPUNIT_TEST(testAbortOutdatedSetBucketState);
+ CPPUNIT_TEST(testAbortOutdatedCreateBucket);
+ CPPUNIT_TEST(testAbortOutdatedDeleteBucket);
+ CPPUNIT_TEST(testAbortOutdatedMergeBucket);
+ CPPUNIT_TEST(testAbortOutdatedRemoveLocation);
+ CPPUNIT_TEST(testIdealStateAbortsAreConfigurable);
+ CPPUNIT_TEST(testAbortOutdatedPutOperation);
+ CPPUNIT_TEST(testAbortOutdatedMultiOperation);
+ CPPUNIT_TEST(testAbortOutdatedUpdateCommand);
+ CPPUNIT_TEST(testAbortOutdatedRemoveCommand);
+ CPPUNIT_TEST(testAbortOutdatedRevertCommand);
+ CPPUNIT_TEST(testIdealStateAbortUpdatesMetric);
+ CPPUNIT_TEST(testExternalLoadOpAbortUpdatesMetric);
+ CPPUNIT_TEST(testExternalLoadOpAbortsAreConfigurable);
+ CPPUNIT_TEST_SUITE_END();
+
+ // TODO test: down edge triggered on cluster state with cluster down?
+
+ std::vector<document::BucketId> insertBuckets(
+ uint32_t numBuckets,
+ uint16_t wantedOwner,
+ const lib::ClusterState& state);
+
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
+ const lib::ClusterState& state) const
+ {
+ return std::make_shared<api::SetSystemStateCommand>(state);
+ }
+
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
+ const std::string& stateStr) const
+ {
+ return createStateCmd(lib::ClusterState(stateStr));
+ }
+
+ void applyDistribution(Redundancy, NodeCount);
+ void applyClusterState(const lib::ClusterState&);
+
+ document::BucketId nextOwnedBucket(
+ uint16_t wantedOwner,
+ const lib::ClusterState& state,
+ const document::BucketId& lastId) const;
+
+ document::BucketId getBucketToAbort() const;
+ document::BucketId getBucketToAllow() const;
+
+ void sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex);
+
+ template <typename MsgType, typename... MsgParams>
+ bool changeAbortsMessage(MsgParams&&... params);
+
+ lib::ClusterState getDefaultTestClusterState() const {
+ return lib::ClusterState("distributor:4 storage:1");
+ }
+
+public:
+ void testEnumerateBucketsBelongingOnChangedNodes();
+ void testNoPreExistingClusterState();
+ void testNoAvailableDistributorsInCurrentState();
+ void testNoAvailableDistributorsInCurrentAndNewState();
+ void testDownEdgeToNoAvailableDistributors();
+ void testOwnershipChangedOnDistributorUpEdge();
+ void testDistributionConfigChangeUpdatesOwnership();
+ void testAbortOpsWhenNoClusterStateSet();
+ void testAbortOutdatedSplit();
+ void testAbortOutdatedJoin();
+ void testAbortOutdatedSetBucketState();
+ void testAbortOutdatedCreateBucket();
+ void testAbortOutdatedDeleteBucket();
+ void testAbortOutdatedMergeBucket();
+ void testAbortOutdatedRemoveLocation();
+ void testIdealStateAbortsAreConfigurable();
+ void testAbortOutdatedPutOperation();
+ void testAbortOutdatedMultiOperation();
+ void testAbortOutdatedUpdateCommand();
+ void testAbortOutdatedRemoveCommand();
+ void testAbortOutdatedRevertCommand();
+ void testIdealStateAbortUpdatesMetric();
+ void testExternalLoadOpAbortUpdatesMetric();
+ void testExternalLoadOpAbortsAreConfigurable();
+
+ void setUp();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(ChangedBucketOwnershipHandlerTest);
+
+document::BucketId
+ChangedBucketOwnershipHandlerTest::nextOwnedBucket(
+ uint16_t wantedOwner,
+ const lib::ClusterState& state,
+ const document::BucketId& lastId) const
+{
+ uint32_t idx(lastId.getId() + 1);
+ while (true) {
+ document::BucketId candidate(16, idx);
+ uint16_t owner(_app->getDistribution()->getIdealDistributorNode(
+ state, candidate));
+ if (owner == wantedOwner) {
+ return candidate;
+ }
+ ++idx;
+ }
+ assert(!"should never get here");
+}
+
+std::vector<document::BucketId>
+ChangedBucketOwnershipHandlerTest::insertBuckets(uint32_t numBuckets,
+ uint16_t wantedOwner,
+ const lib::ClusterState& state)
+{
+ std::vector<document::BucketId> inserted;
+ document::BucketId bucket;
+ while (inserted.size() < numBuckets) {
+ bucket = nextOwnedBucket(wantedOwner, state, bucket);
+
+ bucketdb::StorageBucketInfo sbi;
+ sbi.setBucketInfo(api::BucketInfo(1, 2, 3));
+ sbi.disk = 0;
+ _app->getStorageBucketDatabase().insert(bucket, sbi, "test");
+ inserted.push_back(bucket);
+ }
+ return inserted;
+}
+
+void
+ChangedBucketOwnershipHandlerTest::setUp()
+{
+ vdstestlib::DirConfig config(getStandardConfig(true));
+
+ _app.reset(new TestServiceLayerApp);
+ _top.reset(new DummyStorageLink);
+ _handler = new ChangedBucketOwnershipHandler(config.getConfigId(),
+ _app->getComponentRegister());
+ _top->push_back(std::unique_ptr<StorageLink>(_handler));
+ _bottom = new DummyStorageLink;
+ _handler->push_back(std::unique_ptr<StorageLink>(_bottom));
+ _top->open();
+
+ // Ensure we're not dependent on config schema default values.
+ std::unique_ptr<vespa::config::content::PersistenceConfigBuilder> pconfig(
+ new vespa::config::content::PersistenceConfigBuilder);
+ pconfig->abortOutdatedMutatingIdealStateOps = true;
+ pconfig->abortOutdatedMutatingExternalLoadOps = true;
+ _handler->configure(std::move(pconfig));
+}
+
+namespace {
+
+template <typename Set, typename K>
+bool has(const Set& s, const K& key) {
+ return s.find(key) != s.end();
+}
+
+template <typename Vec>
+bool
+hasAbortedAllOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
+{
+ for (auto& b : v) {
+ if (!cmd->shouldAbort(b)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+template <typename Vec>
+bool
+hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
+{
+ for (auto& b : v) {
+ if (cmd->shouldAbort(b)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool
+hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) {
+ if (link.getNumCommands() != 1) {
+ std::cerr << "expected 1 command, found"
+ << link.getNumCommands() << "\n";
+ }
+ api::SetSystemStateCommand::SP cmd(
+ std::dynamic_pointer_cast<api::SetSystemStateCommand>(
+ link.getCommand(0)));
+ return (cmd.get() != 0);
+}
+
+}
+
+void
+ChangedBucketOwnershipHandlerTest::applyDistribution(
+ Redundancy redundancy, NodeCount nodeCount)
+{
+ _app->setDistribution(redundancy, nodeCount);
+ _handler->storageDistributionChanged();
+}
+
+void
+ChangedBucketOwnershipHandlerTest::applyClusterState(
+ const lib::ClusterState& state)
+{
+ _app->setClusterState(state);
+ _handler->reloadClusterState();
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testEnumerateBucketsBelongingOnChangedNodes()
+{
+ lib::ClusterState stateBefore("distributor:4 storage:1");
+ applyDistribution(Redundancy(1), NodeCount(4));
+ applyClusterState(stateBefore);
+ auto node1Buckets(insertBuckets(2, 1, stateBefore));
+ auto node3Buckets(insertBuckets(2, 3, stateBefore));
+ // Add some buckets that will not be part of the change set
+ auto node0Buckets(insertBuckets(3, 0, stateBefore));
+ auto node2Buckets(insertBuckets(2, 2, stateBefore));
+
+ _top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1"));
+ // TODO: refactor into own function
+ CPPUNIT_ASSERT_EQUAL(size_t(2), _bottom->getNumCommands());
+ AbortBucketOperationsCommand::SP cmd(
+ std::dynamic_pointer_cast<AbortBucketOperationsCommand>(
+ _bottom->getCommand(0)));
+ CPPUNIT_ASSERT(cmd.get() != 0);
+
+ CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node1Buckets));
+ CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node3Buckets));
+ CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node0Buckets));
+ CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node2Buckets));
+
+ // Handler must swallow abort replies
+ _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release()));
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _top->getNumReplies());
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testNoPreExistingClusterState()
+{
+ applyDistribution(Redundancy(1), NodeCount(4));
+ lib::ClusterState stateBefore("distributor:4 storage:1");
+ insertBuckets(2, 1, stateBefore);
+ insertBuckets(3, 0, stateBefore);
+ insertBuckets(2, 2, stateBefore);
+
+ _top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1"));
+ CPPUNIT_ASSERT(hasOnlySetSystemStateCmdQueued(*_bottom));
+}
+
+/**
+ * When current state has no distributors and we receive a state with one or
+ * more distributors, we do not send any abort messages since this should
+ * already have been done on the down-edge.
+ */
+void
+ChangedBucketOwnershipHandlerTest::testNoAvailableDistributorsInCurrentState()
+{
+ applyDistribution(Redundancy(1), NodeCount(3));
+ lib::ClusterState insertedState("distributor:3 storage:1");
+ insertBuckets(2, 0, insertedState);
+ insertBuckets(2, 1, insertedState);
+ insertBuckets(2, 2, insertedState);
+ lib::ClusterState downState("distributor:3 .0.s:d .1.s:d .2.s:d storage:1");
+ _app->setClusterState(downState);
+
+ _top->sendDown(createStateCmd("distributor:3 .1.s:d storage:1"));
+ CPPUNIT_ASSERT(hasOnlySetSystemStateCmdQueued(*_bottom));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testNoAvailableDistributorsInCurrentAndNewState()
+{
+ applyDistribution(Redundancy(1), NodeCount(3));
+ lib::ClusterState insertedState("distributor:3 storage:1");
+ insertBuckets(2, 0, insertedState);
+ insertBuckets(2, 1, insertedState);
+ insertBuckets(2, 2, insertedState);
+ lib::ClusterState stateBefore("distributor:3 .0.s:s .1.s:s .2.s:d storage:1");
+ applyClusterState(stateBefore);
+ lib::ClusterState downState("distributor:3 .0.s:d .1.s:d .2.s:d storage:1");
+
+ _top->sendDown(createStateCmd(downState));
+ CPPUNIT_ASSERT(hasOnlySetSystemStateCmdQueued(*_bottom));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testDownEdgeToNoAvailableDistributors()
+{
+ lib::ClusterState insertedState("distributor:3 storage:1");
+ applyDistribution(Redundancy(1), NodeCount(3));
+ applyClusterState(insertedState);
+ auto node0Buckets(insertBuckets(2, 0, insertedState));
+ auto node1Buckets(insertBuckets(2, 1, insertedState));
+ auto node2Buckets(insertBuckets(2, 2, insertedState));
+ lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1");
+
+ _top->sendDown(createStateCmd(downState));
+ // TODO: refactor into own function
+ CPPUNIT_ASSERT_EQUAL(size_t(2), _bottom->getNumCommands());
+ AbortBucketOperationsCommand::SP cmd(
+ std::dynamic_pointer_cast<AbortBucketOperationsCommand>(
+ _bottom->getCommand(0)));
+ CPPUNIT_ASSERT(cmd.get() != 0);
+
+ CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node0Buckets));
+ CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node1Buckets));
+ CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node2Buckets));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testOwnershipChangedOnDistributorUpEdge()
+{
+ lib::ClusterState stateBefore(
+ "version:10 distributor:4 .1.s:d storage:4 .1.s:d");
+ lib::ClusterState stateAfter(
+ "version:11 distributor:4 .1.t:1369990247 storage:4 .1.s:d");
+ applyDistribution(Redundancy(1), NodeCount(4));
+ applyClusterState(stateBefore);
+ // Add buckets that will belong to distributor 1 after it has come back up
+ auto node1Buckets(insertBuckets(2, 1, stateAfter));
+ // Add some buckets that will not be part of the change set
+ auto node0Buckets(insertBuckets(3, 0, stateAfter));
+ auto node2Buckets(insertBuckets(2, 2, stateAfter));
+
+ _top->sendDown(createStateCmd(stateAfter));
+ // TODO: refactor into own function
+ CPPUNIT_ASSERT_EQUAL(size_t(2), _bottom->getNumCommands());
+ AbortBucketOperationsCommand::SP cmd(
+ std::dynamic_pointer_cast<AbortBucketOperationsCommand>(
+ _bottom->getCommand(0)));
+ CPPUNIT_ASSERT(cmd.get() != 0);
+
+ CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node1Buckets));
+ CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node0Buckets));
+ CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node2Buckets));
+
+ // Handler must swallow abort replies
+ _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release()));
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _top->getNumReplies());
+}
+
+void
+ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(
+ uint16_t fromDistributorIndex)
+{
+ document::BucketId bucket(16, 6786);
+ auto msg = std::make_shared<api::CreateBucketCommand>(bucket);
+ msg->setSourceIndex(fromDistributorIndex);
+
+ _top->sendDown(msg);
+ std::vector<api::StorageMessage::SP> replies(_top->getRepliesOnce());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size());
+ api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*replies[0]));
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
+ reply.getResult().getResult());
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOpsWhenNoClusterStateSet()
+{
+ sendAndExpectAbortedCreateBucket(1);
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testDistributionConfigChangeUpdatesOwnership()
+{
+ lib::ClusterState insertedState("distributor:3 storage:1");
+ applyClusterState(insertedState);
+ applyDistribution(Redundancy(1), NodeCount(3));
+
+ // Apply new distribution config containing only 1 distributor, meaning
+ // any messages sent from >1 must be aborted.
+ applyDistribution(Redundancy(1), NodeCount(1));
+ sendAndExpectAbortedCreateBucket(2);
+}
+
+/**
+ * Generate and dispatch a message of the given type with the provided
+ * aruments as if that message was sent from distributor 1. Messages will
+ * be checked as if the state contains 4 distributors in Up state. This
+ * means that it suffices to send in a message with a bucket that is not
+ * owned by distributor 1 in this state to trigger an abort.
+ */
+template <typename MsgType, typename... MsgParams>
+bool
+ChangedBucketOwnershipHandlerTest::changeAbortsMessage(MsgParams&&... params)
+{
+ auto msg = std::make_shared<MsgType>(std::forward<MsgParams>(params)...);
+ msg->setSourceIndex(1);
+
+ applyDistribution(Redundancy(1), NodeCount(4));
+ applyClusterState(getDefaultTestClusterState());
+
+ _top->sendDown(msg);
+ std::vector<api::StorageMessage::SP> replies(_top->getRepliesOnce());
+ // Test is single-threaded, no need to do any waiting.
+ if (replies.empty()) {
+ return false;
+ } else {
+ CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size());
+ // Make sure the message was actually aborted and not bounced with
+ // some other arbitrary failure code.
+ api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*replies[0]));
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
+ reply.getResult().getResult());
+ return true;
+ }
+}
+
+/**
+ * Returns a bucket that is not owned by the sending distributor (1). More
+ * specifically, it returns a bucket that is owned by distributor 2.
+ */
+document::BucketId
+ChangedBucketOwnershipHandlerTest::getBucketToAbort() const
+{
+ lib::ClusterState state(getDefaultTestClusterState());
+ return nextOwnedBucket(2, state, document::BucketId());
+}
+
+/**
+ * Returns a bucket that _is_ owned by distributor 1 and should thus be
+ * allowed through.
+ */
+document::BucketId
+ChangedBucketOwnershipHandlerTest::getBucketToAllow() const
+{
+ lib::ClusterState state(getDefaultTestClusterState());
+ return nextOwnedBucket(1, state, document::BucketId());
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedSplit()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::SplitBucketCommand>(
+ getBucketToAbort()));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::SplitBucketCommand>(
+ getBucketToAllow()));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedJoin()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::JoinBucketsCommand>(
+ getBucketToAbort()));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::JoinBucketsCommand>(
+ getBucketToAllow()));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedSetBucketState()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::SetBucketStateCommand>(
+ getBucketToAbort(), api::SetBucketStateCommand::ACTIVE));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::SetBucketStateCommand>(
+ getBucketToAllow(), api::SetBucketStateCommand::ACTIVE));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedCreateBucket()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::CreateBucketCommand>(
+ getBucketToAbort()));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::CreateBucketCommand>(
+ getBucketToAllow()));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedDeleteBucket()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::DeleteBucketCommand>(
+ getBucketToAbort()));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::DeleteBucketCommand>(
+ getBucketToAllow()));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedMergeBucket()
+{
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ CPPUNIT_ASSERT(changeAbortsMessage<api::MergeBucketCommand>(
+ getBucketToAbort(), nodes, 0));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::MergeBucketCommand>(
+ getBucketToAllow(), nodes, 0));
+}
+
+/**
+ * RemoveLocation is technically an external load class, but since it's also
+ * used as the backing operation for GC we have to treat it as if it were an
+ * ideal state operation class.
+ */
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedRemoveLocation()
+{
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveLocationCommand>(
+ "foo", getBucketToAbort()));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveLocationCommand>(
+ "foo", getBucketToAllow()));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testIdealStateAbortsAreConfigurable()
+{
+ std::unique_ptr<vespa::config::content::PersistenceConfigBuilder> config(
+ new vespa::config::content::PersistenceConfigBuilder);
+ config->abortOutdatedMutatingIdealStateOps = false;
+ _handler->configure(std::move(config));
+ // Should not abort operation, even when ownership has changed.
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::CreateBucketCommand>(
+ getBucketToAbort()));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedPutOperation()
+{
+ document::Document::SP doc(_testDocRepo.createRandomDocumentAtLocation(1));
+ CPPUNIT_ASSERT(changeAbortsMessage<api::PutCommand>(
+ getBucketToAbort(), doc, api::Timestamp(1234)));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::PutCommand>(
+ getBucketToAllow(), doc, api::Timestamp(1234)));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedMultiOperation()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::MultiOperationCommand>(
+ _testDocRepo.getTypeRepoSP(), getBucketToAbort(), 1024));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::MultiOperationCommand>(
+ _testDocRepo.getTypeRepoSP(), getBucketToAllow(), 1024));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedUpdateCommand()
+{
+ const document::DocumentType* docType(_testDocRepo.getTypeRepo()
+ .getDocumentType("testdoctype1"));
+ document::DocumentId docId("id:foo:testdoctype1::bar");
+ document::DocumentUpdate::SP update(
+ std::make_shared<document::DocumentUpdate>(*docType, docId));
+ CPPUNIT_ASSERT(changeAbortsMessage<api::UpdateCommand>(
+ getBucketToAbort(), update, api::Timestamp(1234)));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::UpdateCommand>(
+ getBucketToAllow(), update, api::Timestamp(1234)));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedRemoveCommand()
+{
+ document::DocumentId docId("id:foo:testdoctype1::bar");
+ CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveCommand>(
+ getBucketToAbort(), docId, api::Timestamp(1234)));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveCommand>(
+ getBucketToAllow(), docId, api::Timestamp(1234)));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testAbortOutdatedRevertCommand()
+{
+ std::vector<api::Timestamp> timestamps;
+ CPPUNIT_ASSERT(changeAbortsMessage<api::RevertCommand>(
+ getBucketToAbort(), timestamps));
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::RevertCommand>(
+ getBucketToAllow(), timestamps));
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testIdealStateAbortUpdatesMetric()
+{
+ CPPUNIT_ASSERT(changeAbortsMessage<api::SplitBucketCommand>(
+ getBucketToAbort()));
+ CPPUNIT_ASSERT_EQUAL(
+ uint64_t(1),
+ _handler->getMetrics().idealStateOpsAborted.getValue());
+ CPPUNIT_ASSERT_EQUAL(
+ uint64_t(0),
+ _handler->getMetrics().externalLoadOpsAborted.getValue());
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testExternalLoadOpAbortUpdatesMetric()
+{
+ document::DocumentId docId("id:foo:testdoctype1::bar");
+ CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveCommand>(
+ getBucketToAbort(), docId, api::Timestamp(1234)));
+ CPPUNIT_ASSERT_EQUAL(
+ uint64_t(0),
+ _handler->getMetrics().idealStateOpsAborted.getValue());
+ CPPUNIT_ASSERT_EQUAL(
+ uint64_t(1),
+ _handler->getMetrics().externalLoadOpsAborted.getValue());
+}
+
+void
+ChangedBucketOwnershipHandlerTest::testExternalLoadOpAbortsAreConfigurable()
+{
+ std::unique_ptr<vespa::config::content::PersistenceConfigBuilder> config(
+ new vespa::config::content::PersistenceConfigBuilder);
+ config->abortOutdatedMutatingExternalLoadOps = false;
+ _handler->configure(std::move(config));
+ // Should not abort operation, even when ownership has changed.
+ document::DocumentId docId("id:foo:testdoctype1::bar");
+ CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveCommand>(
+ getBucketToAbort(), docId, api::Timestamp(1234)));
+}
+
+} // storage
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
new file mode 100644
index 00000000000..fe062a9ee30
--- /dev/null
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -0,0 +1,235 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/storage/storageserver/communicationmanager.h>
+
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
+
+namespace storage {
+
+struct CommunicationManagerTest : public CppUnit::TestFixture {
+ void testSimple();
+ void testDistPendingLimitConfigsArePropagatedToMessageBus();
+ void testStorPendingLimitConfigsArePropagatedToMessageBus();
+ void testCommandsAreDequeuedInPriorityOrder();
+ void testRepliesAreDequeuedInFifoOrder();
+
+ static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60;
+
+ void doTestConfigPropagation(bool isContentNode);
+
+ std::shared_ptr<api::StorageCommand> createDummyCommand(
+ api::StorageMessage::Priority priority)
+ {
+ auto cmd = std::make_shared<api::GetCommand>(
+ document::BucketId(0),
+ document::DocumentId("doc::mydoc"),
+ "[all]");
+ cmd->setAddress(api::StorageMessageAddress(
+ "storage", lib::NodeType::STORAGE, 1));
+ cmd->setPriority(priority);
+ return cmd;
+ }
+
+ CPPUNIT_TEST_SUITE(CommunicationManagerTest);
+ CPPUNIT_TEST(testSimple);
+ CPPUNIT_TEST(testDistPendingLimitConfigsArePropagatedToMessageBus);
+ CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus);
+ CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder);
+ CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(CommunicationManagerTest);
+
+void CommunicationManagerTest::testSimple()
+{
+ mbus::Slobrok slobrok;
+ vdstestlib::DirConfig distConfig(getStandardConfig(false));
+ vdstestlib::DirConfig storConfig(getStandardConfig(true));
+ distConfig.getConfig("stor-server").set("node_index", "1");
+ storConfig.getConfig("stor-server").set("node_index", "1");
+ addSlobrokConfig(distConfig, slobrok);
+ addSlobrokConfig(storConfig, slobrok);
+
+ // Set up a "distributor" and a "storage" node with communication
+ // managers and a dummy storage link below we can use for testing.
+ TestServiceLayerApp storNode(storConfig.getConfigId());
+ TestDistributorApp distNode(distConfig.getConfigId());
+
+ CommunicationManager distributor(distNode.getComponentRegister(),
+ distConfig.getConfigId());
+ CommunicationManager storage(storNode.getComponentRegister(),
+ storConfig.getConfigId());
+ DummyStorageLink *distributorLink = new DummyStorageLink();
+ DummyStorageLink *storageLink = new DummyStorageLink();
+ distributor.push_back(std::unique_ptr<StorageLink>(distributorLink));
+ storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+ distributor.open();
+ storage.open();
+
+ FastOS_Thread::Sleep(1000);
+
+ // Send a message through from distributor to storage
+ std::shared_ptr<api::StorageCommand> cmd(
+ new api::GetCommand(
+ document::BucketId(0), document::DocumentId("doc::mydoc"), "[all]"));
+ cmd->setAddress(api::StorageMessageAddress(
+ "storage", lib::NodeType::STORAGE, 1));
+ distributorLink->sendUp(cmd);
+ storageLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC);
+ CPPUNIT_ASSERT(storageLink->getNumCommands() > 0);
+ std::shared_ptr<api::StorageCommand> cmd2(
+ std::dynamic_pointer_cast<api::StorageCommand>(
+ storageLink->getCommand(0)));
+ CPPUNIT_ASSERT_EQUAL(
+ vespalib::string("doc::mydoc"),
+ static_cast<api::GetCommand&>(*cmd2).getDocumentId().toString());
+ // Reply to the message
+ std::shared_ptr<api::StorageReply> reply(cmd2->makeReply().release());
+ storageLink->sendUp(reply);
+ storageLink->sendUp(reply);
+ distributorLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC);
+ CPPUNIT_ASSERT(distributorLink->getNumCommands() > 0);
+ std::shared_ptr<api::GetReply> reply2(
+ std::dynamic_pointer_cast<api::GetReply>(
+ distributorLink->getCommand(0)));
+ CPPUNIT_ASSERT_EQUAL(false, reply2->wasFound());
+}
+
+void
+CommunicationManagerTest::doTestConfigPropagation(bool isContentNode)
+{
+ mbus::Slobrok slobrok;
+ vdstestlib::DirConfig config(getStandardConfig(isContentNode));
+ config.getConfig("stor-server").set("node_index", "1");
+ auto& cfg = config.getConfig("stor-communicationmanager");
+ cfg.set("mbus_content_node_max_pending_count", "12345");
+ cfg.set("mbus_content_node_max_pending_size", "555666");
+ cfg.set("mbus_distributor_node_max_pending_count", "6789");
+ cfg.set("mbus_distributor_node_max_pending_size", "777888");
+ addSlobrokConfig(config, slobrok);
+
+ std::unique_ptr<TestStorageApp> node;
+ if (isContentNode) {
+ node = std::make_unique<TestServiceLayerApp>(config.getConfigId());
+ } else {
+ node = std::make_unique<TestDistributorApp>(config.getConfigId());
+ }
+
+ CommunicationManager commMgr(node->getComponentRegister(),
+ config.getConfigId());
+ DummyStorageLink *storageLink = new DummyStorageLink();
+ commMgr.push_back(std::unique_ptr<StorageLink>(storageLink));
+ commMgr.open();
+
+ // Outer type is RPCMessageBus, which wraps regular MessageBus.
+ auto& mbus = commMgr.getMessageBus().getMessageBus();
+ if (isContentNode) {
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12345), mbus.getMaxPendingCount());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(555666), mbus.getMaxPendingSize());
+ } else {
+ CPPUNIT_ASSERT_EQUAL(uint32_t(6789), mbus.getMaxPendingCount());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(777888), mbus.getMaxPendingSize());
+ }
+
+ // Test live reconfig of limits.
+ using ConfigBuilder
+ = vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
+ auto liveCfg = std::make_unique<ConfigBuilder>();
+ liveCfg->mbusContentNodeMaxPendingCount = 777777;
+ liveCfg->mbusDistributorNodeMaxPendingCount = 999999;
+
+ commMgr.configure(std::move(liveCfg));
+ if (isContentNode) {
+ CPPUNIT_ASSERT_EQUAL(uint32_t(777777), mbus.getMaxPendingCount());
+ } else {
+ CPPUNIT_ASSERT_EQUAL(uint32_t(999999), mbus.getMaxPendingCount());
+ }
+}
+
+void
+CommunicationManagerTest::testDistPendingLimitConfigsArePropagatedToMessageBus()
+{
+ doTestConfigPropagation(false);
+}
+
+void
+CommunicationManagerTest::testStorPendingLimitConfigsArePropagatedToMessageBus()
+{
+ doTestConfigPropagation(true);
+}
+
+void
+CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
+{
+ mbus::Slobrok slobrok;
+ vdstestlib::DirConfig storConfig(getStandardConfig(true));
+ storConfig.getConfig("stor-server").set("node_index", "1");
+ addSlobrokConfig(storConfig, slobrok);
+ TestServiceLayerApp storNode(storConfig.getConfigId());
+
+ CommunicationManager storage(storNode.getComponentRegister(),
+ storConfig.getConfigId());
+ DummyStorageLink *storageLink = new DummyStorageLink();
+ storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+
+ // Message dequeing does not start before we invoke `open` on the storage
+ // link chain, so we enqueue messages in randomized priority order before
+ // doing so. After starting the thread, we should then get messages down
+ // the chain in a deterministic, prioritized order.
+ // Lower number == higher priority.
+ std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
+ for (auto pri : pris) {
+ storage.enqueue(createDummyCommand(pri));
+ }
+ storage.open();
+ storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
+
+ std::sort(pris.begin(), pris.end());
+ for (size_t i = 0; i < pris.size(); ++i) {
+ // Casting is just to avoid getting mismatched values printed to the
+ // output verbatim as chars.
+ CPPUNIT_ASSERT_EQUAL(
+ uint32_t(pris[i]),
+ uint32_t(storageLink->getCommand(i)->getPriority()));
+ }
+}
+
+void
+CommunicationManagerTest::testRepliesAreDequeuedInFifoOrder()
+{
+ mbus::Slobrok slobrok;
+ vdstestlib::DirConfig storConfig(getStandardConfig(true));
+ storConfig.getConfig("stor-server").set("node_index", "1");
+ addSlobrokConfig(storConfig, slobrok);
+ TestServiceLayerApp storNode(storConfig.getConfigId());
+
+ CommunicationManager storage(storNode.getComponentRegister(),
+ storConfig.getConfigId());
+ DummyStorageLink *storageLink = new DummyStorageLink();
+ storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+
+ std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
+ for (auto pri : pris) {
+ storage.enqueue(createDummyCommand(pri)->makeReply());
+ }
+ storage.open();
+ storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
+
+ // Want FIFO order for replies, not priority-sorted order.
+ for (size_t i = 0; i < pris.size(); ++i) {
+ CPPUNIT_ASSERT_EQUAL(
+ uint32_t(pris[i]),
+ uint32_t(storageLink->getCommand(i)->getPriority()));
+ }
+}
+
+} // storage
diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp
new file mode 100644
index 00000000000..69083352c4a
--- /dev/null
+++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp
@@ -0,0 +1,529 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <vespa/document/config/config-documenttypes.h>
+#include <vespa/document/datatype/datatype.h>
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/storage/storageserver/documentapiconverter.h>
+#include <vespa/storageapi/message/batch.h>
+#include <vespa/storageapi/message/datagram.h>
+#include <vespa/storageapi/message/multioperation.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/visitor.h>
+#include <vespa/vdslib/container/writabledocumentlist.h>
+
+using document::DataType;
+using document::DocIdString;
+using document::Document;
+using document::DocumentId;
+using document::DocumentTypeRepo;
+using document::readDocumenttypesConfig;
+
+namespace storage {
+
+struct DocumentApiConverterTest : public CppUnit::TestFixture
+{
+ std::unique_ptr<DocumentApiConverter> _converter;
+ const DocumentTypeRepo::SP _repo;
+ const DataType& _html_type;
+
+ DocumentApiConverterTest()
+ : _repo(new DocumentTypeRepo(readDocumenttypesConfig(
+ "config-doctypes.cfg"))),
+ _html_type(*_repo->getDocumentType("text/html"))
+ {
+ }
+
+ void setUp() {
+ _converter.reset(new DocumentApiConverter("raw:"));
+ };
+
+ void testPut();
+ void testForwardedPut();
+ void testRemove();
+ void testGet();
+ void testCreateVisitor();
+ void testCreateVisitorHighTimeout();
+ void testCreateVisitorReplyNotReady();
+ void testCreateVisitorReplyLastBucket();
+ void testDestroyVisitor();
+ void testVisitorInfo();
+ void testDocBlock();
+ void testDocBlockWithKeepTimeStamps();
+ void testMultiOperation();
+ void testBatchDocumentUpdate();
+
+ CPPUNIT_TEST_SUITE(DocumentApiConverterTest);
+ CPPUNIT_TEST(testPut);
+ CPPUNIT_TEST(testForwardedPut);
+ CPPUNIT_TEST(testRemove);
+ CPPUNIT_TEST(testGet);
+ CPPUNIT_TEST(testCreateVisitor);
+ CPPUNIT_TEST(testCreateVisitorHighTimeout);
+ CPPUNIT_TEST(testCreateVisitorReplyNotReady);
+ CPPUNIT_TEST(testCreateVisitorReplyLastBucket);
+ CPPUNIT_TEST(testDestroyVisitor);
+ CPPUNIT_TEST(testVisitorInfo);
+ CPPUNIT_TEST(testDocBlock);
+ CPPUNIT_TEST(testDocBlockWithKeepTimeStamps);
+ CPPUNIT_TEST(testMultiOperation);
+ CPPUNIT_TEST(testBatchDocumentUpdate);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(DocumentApiConverterTest);
+
+void DocumentApiConverterTest::testPut()
+{
+ Document::SP
+ doc(new Document(_html_type, DocumentId(DocIdString("test", "test"))));
+
+ documentapi::PutDocumentMessage putmsg(doc);
+ putmsg.setTimestamp(1234);
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(putmsg, _repo);
+
+ api::PutCommand* pc = dynamic_cast<api::PutCommand*>(cmd.get());
+
+ CPPUNIT_ASSERT(pc);
+ CPPUNIT_ASSERT(pc->getDocument().get() == doc.get());
+
+ std::unique_ptr<mbus::Reply> reply = putmsg.createReply();
+ CPPUNIT_ASSERT(reply.get());
+
+ std::unique_ptr<storage::api::StorageReply> rep = _converter->toStorageAPI(
+ static_cast<documentapi::DocumentReply&>(*reply), *cmd);
+ api::PutReply* pr = dynamic_cast<api::PutReply*>(rep.get());
+ CPPUNIT_ASSERT(pr);
+
+ std::unique_ptr<mbus::Message> mbusmsg =
+ _converter->toDocumentAPI(*pc, _repo);
+
+ documentapi::PutDocumentMessage* mbusput = dynamic_cast<documentapi::PutDocumentMessage*>(mbusmsg.get());
+ CPPUNIT_ASSERT(mbusput);
+ CPPUNIT_ASSERT(mbusput->getDocument().get() == doc.get());
+ CPPUNIT_ASSERT(mbusput->getTimestamp() == 1234);
+};
+
+void DocumentApiConverterTest::testForwardedPut()
+{
+ Document::SP
+ doc(new Document(_html_type, DocumentId(DocIdString("test", "test"))));
+
+ documentapi::PutDocumentMessage* putmsg = new documentapi::PutDocumentMessage(doc);
+ std::unique_ptr<mbus::Reply> reply(((documentapi::DocumentMessage*)putmsg)->createReply());
+ reply->setMessage(std::unique_ptr<mbus::Message>(putmsg));
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(*putmsg, _repo);
+ ((storage::api::PutCommand*)cmd.get())->setTimestamp(1234);
+
+ std::unique_ptr<storage::api::StorageReply> rep = cmd->makeReply();
+ api::PutReply* pr = dynamic_cast<api::PutReply*>(rep.get());
+ CPPUNIT_ASSERT(pr);
+
+ _converter->transferReplyState(*pr, *reply);
+}
+
+void DocumentApiConverterTest::testRemove()
+{
+ documentapi::RemoveDocumentMessage removemsg(document::DocumentId(document::DocIdString("test", "test")));
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(removemsg, _repo);
+
+ api::RemoveCommand* rc = dynamic_cast<api::RemoveCommand*>(cmd.get());
+
+ CPPUNIT_ASSERT(rc);
+ CPPUNIT_ASSERT_EQUAL(document::DocumentId(document::DocIdString("test", "test")), rc->getDocumentId());
+
+ std::unique_ptr<mbus::Reply> reply = removemsg.createReply();
+ CPPUNIT_ASSERT(reply.get());
+
+ std::unique_ptr<storage::api::StorageReply> rep = _converter->toStorageAPI(
+ static_cast<documentapi::DocumentReply&>(*reply), *cmd);
+ api::RemoveReply* pr = dynamic_cast<api::RemoveReply*>(rep.get());
+ CPPUNIT_ASSERT(pr);
+
+ std::unique_ptr<mbus::Message> mbusmsg =
+ _converter->toDocumentAPI(*rc, _repo);
+
+ documentapi::RemoveDocumentMessage* mbusremove = dynamic_cast<documentapi::RemoveDocumentMessage*>(mbusmsg.get());
+ CPPUNIT_ASSERT(mbusremove);
+ CPPUNIT_ASSERT_EQUAL(document::DocumentId(document::DocIdString("test", "test")), mbusremove->getDocumentId());
+};
+
+void DocumentApiConverterTest::testGet()
+{
+ documentapi::GetDocumentMessage getmsg(
+ document::DocumentId(document::DocIdString("test", "test")),
+ "foo bar");
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(getmsg, _repo);
+
+ api::GetCommand* rc = dynamic_cast<api::GetCommand*>(cmd.get());
+
+ CPPUNIT_ASSERT(rc);
+ CPPUNIT_ASSERT_EQUAL(document::DocumentId(document::DocIdString("test", "test")), rc->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("foo bar"), rc->getFieldSet());
+};
+
+void DocumentApiConverterTest::testCreateVisitor()
+{
+ documentapi::CreateVisitorMessage cv(
+ "mylib",
+ "myinstance",
+ "control-dest",
+ "data-dest");
+
+ cv.setTimeRemaining(123456);
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(cv, _repo);
+
+ api::CreateVisitorCommand* pc = dynamic_cast<api::CreateVisitorCommand*>(cmd.get());
+
+ CPPUNIT_ASSERT(pc);
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("mylib"), pc->getLibraryName());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("myinstance"), pc->getInstanceId());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("control-dest"), pc->getControlDestination());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("data-dest"), pc->getDataDestination());
+ CPPUNIT_ASSERT_EQUAL(123456u, pc->getTimeout());
+}
+
+void DocumentApiConverterTest::testCreateVisitorHighTimeout()
+{
+ documentapi::CreateVisitorMessage cv(
+ "mylib",
+ "myinstance",
+ "control-dest",
+ "data-dest");
+
+ cv.setTimeRemaining((uint64_t)std::numeric_limits<uint32_t>::max() + 1); // Will be INT_MAX
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(cv, _repo);
+
+ api::CreateVisitorCommand* pc = dynamic_cast<api::CreateVisitorCommand*>(cmd.get());
+
+ CPPUNIT_ASSERT(pc);
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("mylib"), pc->getLibraryName());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("myinstance"), pc->getInstanceId());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("control-dest"), pc->getControlDestination());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("data-dest"), pc->getDataDestination());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) std::numeric_limits<int32_t>::max(),
+ pc->getTimeout());
+}
+
+void DocumentApiConverterTest::testCreateVisitorReplyNotReady()
+{
+ documentapi::CreateVisitorMessage cv(
+ "mylib",
+ "myinstance",
+ "control-dest",
+ "data-dest");
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(cv, _repo);
+ CPPUNIT_ASSERT(cmd.get());
+ api::CreateVisitorCommand& cvc = dynamic_cast<api::CreateVisitorCommand&>(*cmd);
+
+ api::CreateVisitorReply cvr(cvc);
+ cvr.setResult(api::ReturnCode(api::ReturnCode::NOT_READY, "not ready"));
+
+ std::unique_ptr<documentapi::CreateVisitorReply> reply(
+ dynamic_cast<documentapi::CreateVisitorReply*>(
+ cv.createReply().release()));
+ CPPUNIT_ASSERT(reply.get());
+
+ _converter->transferReplyState(cvr, *reply);
+
+ CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::ERROR_NODE_NOT_READY, reply->getError(0).getCode());
+
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(INT_MAX), reply->getLastBucket());
+}
+
+
+void DocumentApiConverterTest::testCreateVisitorReplyLastBucket()
+{
+ documentapi::CreateVisitorMessage cv(
+ "mylib",
+ "myinstance",
+ "control-dest",
+ "data-dest");
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(cv, _repo);
+ CPPUNIT_ASSERT(cmd.get());
+ api::CreateVisitorCommand& cvc = dynamic_cast<api::CreateVisitorCommand&>(*cmd);
+
+
+ api::CreateVisitorReply cvr(cvc);
+ cvr.setLastBucket(document::BucketId(123));
+
+
+ std::unique_ptr<documentapi::CreateVisitorReply> reply(
+ dynamic_cast<documentapi::CreateVisitorReply*>(
+ cv.createReply().release()));
+
+ CPPUNIT_ASSERT(reply.get());
+
+ _converter->transferReplyState(cvr, *reply);
+
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(123), reply->getLastBucket());
+}
+
+
+void DocumentApiConverterTest::testDestroyVisitor()
+{
+ documentapi::DestroyVisitorMessage cv("myinstance");
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(cv, _repo);
+
+ api::DestroyVisitorCommand* pc = dynamic_cast<api::DestroyVisitorCommand*>(cmd.get());
+
+ CPPUNIT_ASSERT(pc);
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("myinstance"), pc->getInstanceId());
+}
+
+void
+DocumentApiConverterTest::testVisitorInfo()
+{
+ api::VisitorInfoCommand vicmd;
+ std::vector<api::VisitorInfoCommand::BucketTimestampPair> bucketsCompleted;
+ bucketsCompleted.push_back(api::VisitorInfoCommand::BucketTimestampPair(document::BucketId(16, 1), 0));
+ bucketsCompleted.push_back(api::VisitorInfoCommand::BucketTimestampPair(document::BucketId(16, 2), 0));
+ bucketsCompleted.push_back(api::VisitorInfoCommand::BucketTimestampPair(document::BucketId(16, 4), 0));
+
+ vicmd.setBucketsCompleted(bucketsCompleted);
+
+ std::unique_ptr<mbus::Message> mbusmsg =
+ _converter->toDocumentAPI(vicmd, _repo);
+
+ documentapi::VisitorInfoMessage* mbusvi = dynamic_cast<documentapi::VisitorInfoMessage*>(mbusmsg.get());
+ CPPUNIT_ASSERT(mbusvi);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), mbusvi->getFinishedBuckets()[0]);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 2), mbusvi->getFinishedBuckets()[1]);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 4), mbusvi->getFinishedBuckets()[2]);
+
+ std::unique_ptr<mbus::Reply> reply = mbusvi->createReply();
+ CPPUNIT_ASSERT(reply.get());
+
+ std::unique_ptr<storage::api::StorageReply> rep = _converter->toStorageAPI(
+ static_cast<documentapi::DocumentReply&>(*reply), vicmd);
+ api::VisitorInfoReply* pr = dynamic_cast<api::VisitorInfoReply*>(rep.get());
+ CPPUNIT_ASSERT(pr);
+}
+
+void
+DocumentApiConverterTest::testDocBlock()
+{
+ Document::SP
+ doc(new Document(_html_type, DocumentId(DocIdString("test", "test"))));
+
+ char buffer[10000];
+ vdslib::WritableDocumentList docBlock(_repo, buffer, sizeof(buffer));
+ docBlock.addPut(*doc, 100);
+
+ document::BucketIdFactory fac;
+ document::BucketId bucketId = fac.getBucketId(doc->getId());
+ bucketId.setUsedBits(32);
+
+ api::DocBlockCommand dbcmd(bucketId, docBlock, std::shared_ptr<void>());
+
+ dbcmd.setTimeout(123456);
+
+ std::unique_ptr<mbus::Message> mbusmsg =
+ _converter->toDocumentAPI(dbcmd, _repo);
+
+ documentapi::MultiOperationMessage* mbusdb = dynamic_cast<documentapi::MultiOperationMessage*>(mbusmsg.get());
+ CPPUNIT_ASSERT(mbusdb);
+
+ CPPUNIT_ASSERT_EQUAL((uint64_t)123456, mbusdb->getTimeRemaining());
+
+ const vdslib::DocumentList& list = mbusdb->getOperations();
+ CPPUNIT_ASSERT_EQUAL((uint32_t)1, list.size());
+ CPPUNIT_ASSERT_EQUAL(*doc, *dynamic_cast<document::Document*>(list.begin()->getDocument().get()));
+
+ std::unique_ptr<mbus::Reply> reply = mbusdb->createReply();
+ CPPUNIT_ASSERT(reply.get());
+
+ std::unique_ptr<storage::api::StorageReply> rep =
+ _converter->toStorageAPI(static_cast<documentapi::DocumentReply&>(*reply), dbcmd);
+ api::DocBlockReply* pr = dynamic_cast<api::DocBlockReply*>(rep.get());
+ CPPUNIT_ASSERT(pr);
+}
+
+
+void
+DocumentApiConverterTest::testDocBlockWithKeepTimeStamps()
+{
+ char buffer[10000];
+ vdslib::WritableDocumentList docBlock(_repo, buffer, sizeof(buffer));
+ api::DocBlockCommand dbcmd(document::BucketId(0), docBlock, std::shared_ptr<void>());
+
+ {
+ CPPUNIT_ASSERT_EQUAL(dbcmd.keepTimeStamps(), false);
+
+ std::unique_ptr<mbus::Message> mbusmsg =
+ _converter->toDocumentAPI(dbcmd, _repo);
+
+ documentapi::MultiOperationMessage* mbusdb = dynamic_cast<documentapi::MultiOperationMessage*>(mbusmsg.get());
+ CPPUNIT_ASSERT(mbusdb);
+
+ CPPUNIT_ASSERT_EQUAL(mbusdb->keepTimeStamps(), false);
+ }
+
+ {
+ dbcmd.keepTimeStamps(true);
+ CPPUNIT_ASSERT_EQUAL(dbcmd.keepTimeStamps(), true);
+
+ std::unique_ptr<mbus::Message> mbusmsg =
+ _converter->toDocumentAPI(dbcmd, _repo);
+
+ documentapi::MultiOperationMessage* mbusdb = dynamic_cast<documentapi::MultiOperationMessage*>(mbusmsg.get());
+ CPPUNIT_ASSERT(mbusdb);
+
+ CPPUNIT_ASSERT_EQUAL(mbusdb->keepTimeStamps(), true);
+ }
+
+}
+
+
+void
+DocumentApiConverterTest::testMultiOperation()
+{
+ //create a document
+ Document::SP
+ doc(new Document(_html_type, DocumentId(DocIdString("test", "test"))));
+
+ document::BucketIdFactory fac;
+ document::BucketId bucketId = fac.getBucketId(doc->getId());
+ bucketId.setUsedBits(32);
+
+ {
+ documentapi::MultiOperationMessage momsg(_repo, bucketId, 10000);
+
+ vdslib::WritableDocumentList operations(_repo, &(momsg.getBuffer()[0]),
+ momsg.getBuffer().size());
+ operations.addPut(*doc, 100);
+
+ momsg.setOperations(operations);
+
+ CPPUNIT_ASSERT(momsg.getBuffer().size() > 0);
+
+ // Convert it to Storage API
+ std::unique_ptr<api::StorageCommand> stcmd =
+ _converter->toStorageAPI(momsg, _repo);
+
+ api::MultiOperationCommand* mocmd = dynamic_cast<api::MultiOperationCommand*>(stcmd.get());
+ CPPUNIT_ASSERT(mocmd);
+ CPPUNIT_ASSERT(mocmd->getBuffer().size() > 0);
+
+ // Get operations from Storage API message and check document
+ const vdslib::DocumentList& list = mocmd->getOperations();
+ CPPUNIT_ASSERT_EQUAL((uint32_t)1, list.size());
+ CPPUNIT_ASSERT_EQUAL(*doc, *dynamic_cast<document::Document*>(list.begin()->getDocument().get()));
+
+ // Create Storage API Reply
+ std::unique_ptr<api::MultiOperationReply> moreply = std::unique_ptr<api::MultiOperationReply>(new api::MultiOperationReply(*mocmd));
+ CPPUNIT_ASSERT(moreply.get());
+
+ // convert storage api reply to mbus reply.....
+ // ...
+ }
+
+ {
+ api::MultiOperationCommand mocmd(_repo, bucketId, 10000, false);
+ mocmd.getOperations().addPut(*doc, 100);
+
+ // Convert it to documentapi
+ std::unique_ptr<mbus::Message> mbmsg =
+ _converter->toDocumentAPI(mocmd, _repo);
+ documentapi::MultiOperationMessage* momsg = dynamic_cast<documentapi::MultiOperationMessage*>(mbmsg.get());
+ CPPUNIT_ASSERT(momsg);
+
+ // Get operations from Document API msg and check document
+ const vdslib::DocumentList& list = momsg->getOperations();
+ CPPUNIT_ASSERT_EQUAL((uint32_t)1, list.size());
+ CPPUNIT_ASSERT_EQUAL(*doc, *dynamic_cast<document::Document*>(list.begin()->getDocument().get()));
+
+ // Create Document API reply
+ mbus::Reply::UP moreply = momsg->createReply();
+ CPPUNIT_ASSERT(moreply.get());
+
+ //Convert DocumentAPI reply to storageapi reply
+ std::unique_ptr<api::StorageReply> streply =
+ _converter->toStorageAPI(static_cast<documentapi::DocumentReply&>(*moreply), mocmd);
+ api::MultiOperationReply* mostreply = dynamic_cast<api::MultiOperationReply*>(streply.get());
+ CPPUNIT_ASSERT(mostreply);
+
+ }
+}
+
+void
+DocumentApiConverterTest::testBatchDocumentUpdate()
+{
+ std::vector<document::DocumentUpdate::SP > updates;
+
+ {
+ document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test1"));
+ document::DocumentUpdate::SP update(
+ new document::DocumentUpdate(_html_type, docId));
+ updates.push_back(update);
+ }
+
+ {
+ document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test2"));
+ document::DocumentUpdate::SP update(
+ new document::DocumentUpdate(_html_type, docId));
+ updates.push_back(update);
+ }
+
+ {
+ document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test3"));
+ document::DocumentUpdate::SP update(
+ new document::DocumentUpdate(_html_type, docId));
+ updates.push_back(update);
+ }
+
+ std::shared_ptr<documentapi::BatchDocumentUpdateMessage> msg(
+ new documentapi::BatchDocumentUpdateMessage(1234));
+ for (std::size_t i = 0; i < updates.size(); ++i) {
+ msg->addUpdate(updates[i]);
+ }
+
+ std::unique_ptr<storage::api::StorageCommand> cmd =
+ _converter->toStorageAPI(*msg, _repo);
+ api::BatchDocumentUpdateCommand* batchCmd = dynamic_cast<api::BatchDocumentUpdateCommand*>(cmd.get());
+ CPPUNIT_ASSERT(batchCmd);
+ CPPUNIT_ASSERT_EQUAL(updates.size(), batchCmd->getUpdates().size());
+ for (std::size_t i = 0; i < updates.size(); ++i) {
+ CPPUNIT_ASSERT_EQUAL(*updates[i], *batchCmd->getUpdates()[i]);
+ }
+
+ api::BatchDocumentUpdateReply batchReply(*batchCmd);
+ batchReply.getDocumentsNotFound().resize(3);
+ batchReply.getDocumentsNotFound()[0] = true;
+ batchReply.getDocumentsNotFound()[2] = true;
+
+ std::unique_ptr<mbus::Reply> mbusReply = msg->createReply();
+ documentapi::BatchDocumentUpdateReply* mbusBatchReply(
+ dynamic_cast<documentapi::BatchDocumentUpdateReply*>(mbusReply.get()));
+ CPPUNIT_ASSERT(mbusBatchReply != 0);
+
+ _converter->transferReplyState(batchReply, *mbusReply);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(3), mbusBatchReply->getDocumentsNotFound().size());
+ CPPUNIT_ASSERT(mbusBatchReply->getDocumentsNotFound()[0] == true);
+ CPPUNIT_ASSERT(mbusBatchReply->getDocumentsNotFound()[1] == false);
+ CPPUNIT_ASSERT(mbusBatchReply->getDocumentsNotFound()[2] == true);
+}
+
+}
diff --git a/storage/src/tests/storageserver/dummystoragelink.cpp b/storage/src/tests/storageserver/dummystoragelink.cpp
new file mode 100644
index 00000000000..7194f1fba3d
--- /dev/null
+++ b/storage/src/tests/storageserver/dummystoragelink.cpp
@@ -0,0 +1,182 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
+#include <tests/common/dummystoragelink.h>
+#include <sys/time.h>
+
+namespace storage {
+
+DummyStorageLink* DummyStorageLink::_last(0);
+
+DummyStorageLink::DummyStorageLink()
+ : StorageLink("Dummy storage link"),
+ _commands(),
+ _replies(),
+ _injected(),
+ _autoReply(false),
+ _useDispatch(false),
+ _ignore(false),
+ _waitMonitor()
+{
+ _last = this;
+}
+
+DummyStorageLink::~DummyStorageLink()
+{
+ // Often a chain with dummy link on top is deleted in unit tests.
+ // If they haven't been closed already, close them for a cleaner
+ // shutdown
+ if (getState() == OPENED) {
+ close();
+ flush();
+ }
+ closeNextLink();
+ reset();
+}
+
+bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd)
+{
+ if (_ignore) {
+ return false;
+ }
+ if (_injected.size() > 0) {
+ vespalib::LockGuard guard(_lock);
+ sendUp(*_injected.begin());
+ _injected.pop_front();
+ } else if (_autoReply) {
+ if (!cmd->getType().isReply()) {
+ std::shared_ptr<api::StorageReply> reply(
+ std::dynamic_pointer_cast<api::StorageCommand>(cmd)
+ ->makeReply().release());
+ reply->setResult(api::ReturnCode(
+ api::ReturnCode::OK, "Automatically generated reply"));
+ sendUp(reply);
+ }
+ }
+ if (isBottom()) {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ {
+ vespalib::LockGuard guard(_lock);
+ _commands.push_back(cmd);
+ }
+ lock.broadcast();
+ return true;
+ }
+ return StorageLink::onDown(cmd);
+}
+
+bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) {
+ if (isTop()) {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ {
+ vespalib::LockGuard guard(_lock);
+ _replies.push_back(reply);
+ }
+ lock.broadcast();
+ return true;
+ }
+ return StorageLink::onUp(reply);
+
+}
+
+void DummyStorageLink::injectReply(api::StorageReply* reply)
+{
+ assert(reply);
+ vespalib::LockGuard guard(_lock);
+ _injected.push_back(std::shared_ptr<api::StorageReply>(reply));
+}
+
+void DummyStorageLink::reset() {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ vespalib::LockGuard guard(_lock);
+ _commands.clear();
+ _replies.clear();
+ _injected.clear();
+}
+
+void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout)
+{
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
+ vespalib::MonitorGuard lock(_waitMonitor);
+ while (_commands.size() + _replies.size() < msgCount) {
+ if (timeout != 0 && clock.getTimeInMillis() > endTime) {
+ std::ostringstream ost;
+ ost << "Timed out waiting for " << msgCount << " messages to "
+ << "arrive in dummy storage link. Only "
+ << (_commands.size() + _replies.size()) << " messages seen "
+ << "after timout of " << timeout << " seconds was reached.";
+ throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
+ }
+ if (timeout >= 0) {
+ lock.wait((endTime - clock.getTimeInMillis()).getTime());
+ } else {
+ lock.wait();
+ }
+ }
+}
+
+void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
+{
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
+ vespalib::MonitorGuard lock(_waitMonitor);
+ while (true) {
+ for (uint32_t i=0; i<_commands.size(); ++i) {
+ if (_commands[i]->getType() == type) return;
+ }
+ for (uint32_t i=0; i<_replies.size(); ++i) {
+ if (_replies[i]->getType() == type) return;
+ }
+ if (timeout != 0 && clock.getTimeInMillis() > endTime) {
+ std::ostringstream ost;
+ ost << "Timed out waiting for " << type << " message to "
+ << "arrive in dummy storage link. Only "
+ << (_commands.size() + _replies.size()) << " messages seen "
+ << "after timout of " << timeout << " seconds was reached.";
+ if (_commands.size() == 1) {
+ ost << " Found command of type " << _commands[0]->getType();
+ }
+ if (_replies.size() == 1) {
+ ost << " Found command of type " << _replies[0]->getType();
+ }
+ throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
+ }
+ if (timeout >= 0) {
+ lock.wait((endTime - clock.getTimeInMillis()).getTime());
+ } else {
+ lock.wait();
+ }
+ }
+}
+
+api::StorageMessage::SP
+DummyStorageLink::getAndRemoveMessage(const api::MessageType& type)
+{
+ vespalib::MonitorGuard lock(_waitMonitor);
+ for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin();
+ it != _commands.end(); ++it)
+ {
+ if ((*it)->getType() == type) {
+ api::StorageMessage::SP result(*it);
+ _commands.erase(it);
+ return result;
+ }
+ }
+ for (std::vector<api::StorageMessage::SP>::iterator it = _replies.begin();
+ it != _replies.end(); ++it)
+ {
+ if ((*it)->getType() == type) {
+ api::StorageMessage::SP result(*it);
+ _replies.erase(it);
+ return result;
+ }
+ }
+ std::ostringstream ost;
+ ost << "No message of type " << type << " found.";
+ throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
+}
+
+} // storage
diff --git a/storage/src/tests/storageserver/dummystoragelink.h b/storage/src/tests/storageserver/dummystoragelink.h
new file mode 100644
index 00000000000..cb9df8c5642
--- /dev/null
+++ b/storage/src/tests/storageserver/dummystoragelink.h
@@ -0,0 +1,115 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/sync.h>
+#include <list>
+#include <sstream>
+#include <vespa/storageapi/messageapi/storagecommand.h>
+#include <string>
+#include <vector>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/common/bucketmessages.h>
+#include <vespa/storageapi/message/internal.h>
+
+class FastOS_ThreadPool;
+
+namespace storage {
+
+class DummyStorageLink : public StorageLink {
+
+ mutable vespalib::Lock _lock; // to protect below containers:
+ std::vector<api::StorageMessage::SP> _commands;
+ std::vector<api::StorageMessage::SP> _replies;
+ std::list<api::StorageMessage::SP> _injected;
+
+ bool _autoReply;
+ bool _useDispatch;
+ bool _ignore;
+ static DummyStorageLink* _last;
+ vespalib::Monitor _waitMonitor;
+
+public:
+ DummyStorageLink();
+ ~DummyStorageLink();
+
+ bool onDown(const api::StorageMessage::SP&);
+ bool onUp(const api::StorageMessage::SP&);
+
+ void addOnTopOfChain(StorageLink& link) {
+ link.addTestLinkOnTop(this);
+ }
+
+ void print(std::ostream& ost, bool verbose, const std::string& indent) const
+ {
+ (void) verbose;
+ ost << indent << "DummyStorageLink("
+ << "autoreply = " << (_autoReply ? "on" : "off")
+ << ", dispatch = " << (_useDispatch ? "on" : "off")
+ << ", " << _commands.size() << " commands"
+ << ", " << _replies.size() << " replies";
+ if (_injected.size() > 0)
+ ost << ", " << _injected.size() << " injected";
+ ost << ")";
+ }
+
+ void injectReply(api::StorageReply* reply);
+ void reset();
+ void setAutoreply(bool autoReply) { _autoReply = autoReply; }
+ void setIgnore(bool ignore) { _ignore = ignore; }
+ // Timeout is given in seconds
+ void waitForMessages(unsigned int msgCount = 1, int timeout = -1);
+ // Wait for a single message of a given type
+ void waitForMessage(const api::MessageType&, int timeout = -1);
+
+ api::StorageMessage::SP getCommand(size_t i) const {
+ vespalib::LockGuard guard(_lock);
+ api::StorageMessage::SP ret = _commands[i];
+ return ret;
+ }
+ api::StorageMessage::SP getReply(size_t i) const {
+ vespalib::LockGuard guard(_lock);
+ api::StorageMessage::SP ret = _replies[i];
+ return ret;
+ }
+ size_t getNumCommands() const {
+ vespalib::LockGuard guard(_lock);
+ return _commands.size();
+ }
+ size_t getNumReplies() const {
+ vespalib::LockGuard guard(_lock);
+ return _replies.size();
+ }
+
+ const std::vector<api::StorageMessage::SP>& getCommands() const
+ { return _commands; }
+ const std::vector<api::StorageMessage::SP>& getReplies() const
+ { return _replies; }
+
+ std::vector<api::StorageMessage::SP> getCommandsOnce() {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ std::vector<api::StorageMessage::SP> retval;
+ {
+ vespalib::LockGuard guard(_lock);
+ retval.swap(_commands);
+ }
+ return retval;
+ }
+
+ std::vector<api::StorageMessage::SP> getRepliesOnce() {
+ vespalib::MonitorGuard lock(_waitMonitor);
+ std::vector<api::StorageMessage::SP> retval;
+ {
+ vespalib::LockGuard guard(_lock);
+ retval.swap(_replies);
+ }
+ return retval;
+ }
+
+ api::StorageMessage::SP getAndRemoveMessage(const api::MessageType&);
+
+ static DummyStorageLink* getLast() { return _last; }
+};
+
+}
+
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
new file mode 100644
index 00000000000..e705db80788
--- /dev/null
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -0,0 +1,1566 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <memory>
+#include <iterator>
+#include <vector>
+#include <algorithm>
+#include <ctime>
+#include <vespa/vespalib/util/document_runnable.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/storagelinktest.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/dummystoragelink.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
+#include <vespa/storage/persistence/messages.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/message/state.h>
+
+using namespace document;
+using namespace storage::api;
+
+namespace storage {
+
+namespace {
+
+struct MergeBuilder
+{
+ document::BucketId _bucket;
+ api::Timestamp _maxTimestamp;
+ std::vector<uint16_t> _nodes;
+ std::vector<uint16_t> _chain;
+ uint64_t _clusterStateVersion;
+
+ MergeBuilder(const document::BucketId& bucket)
+ : _bucket(bucket),
+ _maxTimestamp(1234),
+ _chain(),
+ _clusterStateVersion(1)
+ {
+ nodes(0, 1, 2);
+ }
+
+ MergeBuilder& nodes(uint16_t n0) {
+ _nodes.push_back(n0);
+ return *this;
+ }
+ MergeBuilder& nodes(uint16_t n0, uint16_t n1) {
+ _nodes.push_back(n0);
+ _nodes.push_back(n1);
+ return *this;
+ }
+ MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) {
+ _nodes.push_back(n0);
+ _nodes.push_back(n1);
+ _nodes.push_back(n2);
+ return *this;
+ }
+ MergeBuilder& maxTimestamp(api::Timestamp maxTs) {
+ _maxTimestamp = maxTs;
+ return *this;
+ }
+ MergeBuilder& clusterStateVersion(uint64_t csv) {
+ _clusterStateVersion = csv;
+ return *this;
+ }
+ MergeBuilder& chain(uint16_t n0) {
+ _chain.clear();
+ _chain.push_back(n0);
+ return *this;
+ }
+ MergeBuilder& chain(uint16_t n0, uint16_t n1) {
+ _chain.clear();
+ _chain.push_back(n0);
+ _chain.push_back(n1);
+ return *this;
+ }
+ MergeBuilder& chain(uint16_t n0, uint16_t n1, uint16_t n2) {
+ _chain.clear();
+ _chain.push_back(n0);
+ _chain.push_back(n1);
+ _chain.push_back(n2);
+ return *this;
+ }
+
+ api::MergeBucketCommand::SP create() const {
+ std::vector<api::MergeBucketCommand::Node> n;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ n.push_back(_nodes[i]);
+ }
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(_bucket, n, _maxTimestamp,
+ _clusterStateVersion, _chain));
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, _nodes[0]);
+ cmd->setAddress(address);
+ return cmd;
+ }
+};
+
+std::shared_ptr<api::SetSystemStateCommand>
+makeSystemStateCmd(const std::string& state)
+{
+ return std::make_shared<api::SetSystemStateCommand>(
+ lib::ClusterState(state));
+}
+
+} // anon ns
+
+class MergeThrottlerTest : public CppUnit::TestFixture
+{
+ CPPUNIT_TEST_SUITE(MergeThrottlerTest);
+ CPPUNIT_TEST(testMergesConfig);
+ CPPUNIT_TEST(testChain);
+ CPPUNIT_TEST(testWithSourceOnlyNode);
+ CPPUNIT_TEST(test42DistributorBehavior);
+ CPPUNIT_TEST(test42DistributorBehaviorDoesNotTakeOwnership);
+ CPPUNIT_TEST(testEndOfChainExecutionDoesNotTakeOwnership);
+ CPPUNIT_TEST(testResendHandling);
+ CPPUNIT_TEST(testPriorityQueuing);
+ CPPUNIT_TEST(testCommandInQueueDuplicateOfKnownMerge);
+ CPPUNIT_TEST(testInvalidReceiverNode);
+ CPPUNIT_TEST(testForwardQueuedMerge);
+ CPPUNIT_TEST(testExecuteQueuedMerge);
+ CPPUNIT_TEST(testFlush);
+ CPPUNIT_TEST(testUnseenMergeWithNodeInChain);
+ CPPUNIT_TEST(testMergeWithNewerClusterStateFlushesOutdatedQueued);
+ CPPUNIT_TEST(testUpdatedClusterStateFlushesOutdatedQueued);
+ CPPUNIT_TEST(test42MergesDoNotTriggerFlush);
+ CPPUNIT_TEST(testOutdatedClusterStateMergesAreRejectedOnArrival);
+ CPPUNIT_TEST(testUnknownMergeWithSelfInChain);
+ CPPUNIT_TEST(testBusyReturnedOnFullQueue);
+ CPPUNIT_TEST(testBrokenCycle);
+ CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected);
+ CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected);
+ CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges);
+ CPPUNIT_TEST_SUITE_END();
+public:
+ void setUp();
+ void tearDown();
+
+ void testMergesConfig();
+ void testChain();
+ void testWithSourceOnlyNode();
+ void test42DistributorBehavior();
+ void test42DistributorBehaviorDoesNotTakeOwnership();
+ void testEndOfChainExecutionDoesNotTakeOwnership();
+ void testResendHandling();
+ void testPriorityQueuing();
+ void testCommandInQueueDuplicateOfKnownMerge();
+ void testInvalidReceiverNode();
+ void testForwardQueuedMerge();
+ void testExecuteQueuedMerge();
+ void testFlush();
+ void testUnseenMergeWithNodeInChain();
+ void testMergeWithNewerClusterStateFlushesOutdatedQueued();
+ void testUpdatedClusterStateFlushesOutdatedQueued();
+ void test42MergesDoNotTriggerFlush();
+ void testOutdatedClusterStateMergesAreRejectedOnArrival();
+ void testUnknownMergeWithSelfInChain();
+ void testBusyReturnedOnFullQueue();
+ void testBrokenCycle();
+ void testGetBucketDiffCommandNotInActiveSetIsRejected();
+ void testApplyBucketDiffCommandNotInActiveSetIsRejected();
+ void testNewClusterStateAbortsAllOutdatedActiveMerges();
+private:
+ static const int _storageNodeCount = 3;
+ static const int _messageWaitTime = 100;
+
+ // Using n storage node links and dummy servers
+ std::vector<std::shared_ptr<DummyStorageLink> > _topLinks;
+ std::vector<std::shared_ptr<TestServiceLayerApp> > _servers;
+ std::vector<MergeThrottler*> _throttlers;
+ std::vector<DummyStorageLink*> _bottomLinks;
+
+ api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
+
+ void sendAndExpectReply(
+ const std::shared_ptr<api::StorageMessage>& msg,
+ const api::MessageType& expectedReplyType,
+ api::ReturnCode::Result expectedResultCode);
+};
+
+const int MergeThrottlerTest::_storageNodeCount;
+const int MergeThrottlerTest::_messageWaitTime;
+
+CPPUNIT_TEST_SUITE_REGISTRATION(MergeThrottlerTest);
+
+void
+MergeThrottlerTest::setUp()
+{
+ vdstestlib::DirConfig config(getStandardConfig(true));
+
+ for (int i = 0; i < _storageNodeCount; ++i) {
+ std::unique_ptr<TestServiceLayerApp> server(
+ new TestServiceLayerApp(DiskCount(1), NodeIndex(i)));
+ server->setClusterState(lib::ClusterState(
+ "distributor:100 storage:100 version:1"));
+ std::unique_ptr<DummyStorageLink> top;
+
+ top.reset(new DummyStorageLink);
+ MergeThrottler* throttler = new MergeThrottler(config.getConfigId(), server->getComponentRegister());
+ // MergeThrottler will be sandwiched in between two dummy links
+ top->push_back(std::unique_ptr<StorageLink>(throttler));
+ DummyStorageLink* bottom = new DummyStorageLink;
+ throttler->push_back(std::unique_ptr<StorageLink>(bottom));
+
+ _servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release()));
+ _throttlers.push_back(throttler);
+ _bottomLinks.push_back(bottom);
+ top->open();
+ _topLinks.push_back(std::shared_ptr<DummyStorageLink>(top.release()));
+ }
+}
+
+void
+MergeThrottlerTest::tearDown()
+{
+ for (std::size_t i = 0; i < _topLinks.size(); ++i) {
+ if (_topLinks[i]->getState() == StorageLink::OPENED) {
+ _topLinks[i]->close();
+ _topLinks[i]->flush();
+ }
+ _topLinks[i] = std::shared_ptr<DummyStorageLink>();
+ }
+ _topLinks.clear();
+ _bottomLinks.clear();
+ _throttlers.clear();
+ _servers.clear();
+}
+
+namespace {
+
+template <typename Iterator>
+bool
+checkChain(const StorageMessage::SP& msg,
+ Iterator first, Iterator end)
+{
+ const MergeBucketCommand& cmd =
+ dynamic_cast<const MergeBucketCommand&>(*msg);
+
+ if (cmd.getChain().size() != static_cast<std::size_t>(std::distance(first, end))) {
+ return false;
+ }
+
+ return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
+}
+
+void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout)
+{
+ std::time_t start = std::time(0);
+ while (true) {
+ std::size_t count;
+ {
+ vespalib::LockGuard lock(throttler.getStateLock());
+ count = throttler.getMergeQueue().size();
+ }
+ if (count == sz) {
+ break;
+ }
+ std::time_t now = std::time(0);
+ if (now - start > timeout) {
+ std::ostringstream os;
+ os << "Timeout while waiting for merge queue with " << sz << " items. Had "
+ << count << " at timeout.";
+ throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC);
+ }
+ FastOS_Thread::Sleep(1);
+ }
+}
+
+}
+
+// Extremely simple test that just checks that (min|max)_merges_per_node
+// under the stor-server config gets propagated to all the nodes
+void
+MergeThrottlerTest::testMergesConfig()
+{
+ for (int i = 0; i < _storageNodeCount; ++i) {
+ CPPUNIT_ASSERT_EQUAL(uint32_t(25), _throttlers[i]->getThrottlePolicy().getMaxPendingCount());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(20), _throttlers[i]->getMaxQueueSize());
+ }
+}
+
+// Test that a distributor sending a merge to the lowest-index storage
+// node correctly invokes a merge forwarding chain and subsequent unwind.
+void
+MergeThrottlerTest::testChain()
+{
+ uint16_t indices[_storageNodeCount];
+ for (int i = 0; i < _storageNodeCount; ++i) {
+ indices[i] = i;
+ _servers[i]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:123"));
+ }
+
+ BucketId bid(14, 0x1337);
+
+ // Use different node permutations to ensure it works no matter which node is
+ // set as the executor. More specifically, _all_ permutations.
+ do {
+ uint16_t lastNodeIdx = _storageNodeCount - 1;
+ uint16_t executorNode = indices[0];
+
+ //std::cout << "\n----\n";
+ std::vector<MergeBucketCommand::Node> nodes;
+ for (int i = 0; i < _storageNodeCount; ++i) {
+ nodes.push_back(MergeBucketCommand::Node(indices[i], (i + executorNode) % 2 == 0));
+ //std::cout << indices[i] << " ";
+ }
+ //std::cout << "\n";
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, UINT_MAX, 123));
+ cmd->setPriority(7);
+ cmd->setTimeout(54321);
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+ cmd->setAddress(address);
+ const uint16_t distributorIndex = 123;
+ cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
+
+ StorageMessage::SP fwd = cmd;
+ StorageMessage::SP fwdToExec;
+
+ // TODO: make generic wrt. _storageNodeCount
+
+ for (int i = 0; i < _storageNodeCount - 1; ++i) {
+ if (i == executorNode) {
+ fwdToExec = fwd;
+ }
+ CPPUNIT_ASSERT_EQUAL(uint16_t(i), _servers[i]->getIndex());
+ // No matter the node order, command is always sent to node 0 -> 1 -> 2 etc
+ _topLinks[i]->sendDown(fwd);
+ _topLinks[i]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ //std::cout << "fwd " << i << " -> " << i+1 << "\n";
+
+ // Forwarded merge should not be sent down. Should not be necessary
+ // to lock throttler here, since it should be sleeping like a champion
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[i]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[i]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[i]->getActiveMerges().size());
+
+ fwd = _topLinks[i]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(i + 1), fwd->getAddress()->getIndex());
+ CPPUNIT_ASSERT_EQUAL(distributorIndex, dynamic_cast<const StorageCommand&>(*fwd).getSourceIndex());
+ {
+ //uint16_t chain[] = { 0 };
+ std::vector<uint16_t> chain;
+ for (int j = 0; j <= i; ++j) {
+ chain.push_back(j);
+ }
+ CPPUNIT_ASSERT(checkChain(fwd, chain.begin(), chain.end()));
+ }
+ // Ensure priority, cluster state version and timeout is correctly forwarded
+ CPPUNIT_ASSERT_EQUAL(7, static_cast<int>(fwd->getPriority()));
+ CPPUNIT_ASSERT_EQUAL(uint32_t(123), dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(54321), dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ }
+
+ _topLinks[lastNodeIdx]->sendDown(fwd);
+
+ // If node 2 is the first in the node list, it should immediately execute
+ // the merge. Otherwise, a cycle with the first node should be formed.
+ if (executorNode != lastNodeIdx) {
+ //std::cout << "cycle " << lastNodeIdx << " -> " << executorNode << "\n";
+ _topLinks[lastNodeIdx]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ // Forwarded merge should not be sent down
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[lastNodeIdx]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[lastNodeIdx]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[lastNodeIdx]->getActiveMerges().size());
+
+ fwd = _topLinks[lastNodeIdx]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(executorNode), fwd->getAddress()->getIndex());
+ CPPUNIT_ASSERT_EQUAL(distributorIndex, dynamic_cast<const StorageCommand&>(*fwd).getSourceIndex());
+ {
+ std::vector<uint16_t> chain;
+ for (int j = 0; j < _storageNodeCount; ++j) {
+ chain.push_back(j);
+ }
+ CPPUNIT_ASSERT(checkChain(fwd, chain.begin(), chain.end()));
+ }
+ CPPUNIT_ASSERT_EQUAL(7, static_cast<int>(fwd->getPriority()));
+ CPPUNIT_ASSERT_EQUAL(uint32_t(123), dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(54321), dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+
+ _topLinks[executorNode]->sendDown(fwd);
+ }
+
+ _bottomLinks[executorNode]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ // Forwarded merge has now been sent down to persistence layer
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[executorNode]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[executorNode]->getNumReplies()); // No reply sent yet
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[executorNode]->getActiveMerges().size()); // no re-registering merge
+
+ if (executorNode != lastNodeIdx) {
+ // The MergeBucketCommand that is kept in the executor node should
+ // be the one from the node it initially got it from, NOT the one
+ // from the last node, since the chain has looped
+ CPPUNIT_ASSERT(_throttlers[executorNode]->getActiveMerges().find(bid)
+ != _throttlers[executorNode]->getActiveMerges().end());
+ CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()),
+ _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get());
+ }
+
+ // Send reply up from persistence layer to simulate a completed
+ // merge operation. Chain should now unwind properly
+ fwd = _bottomLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(7, static_cast<int>(fwd->getPriority()));
+ CPPUNIT_ASSERT_EQUAL(uint32_t(123), dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(54321), dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<"));
+ _bottomLinks[executorNode]->sendUp(reply);
+
+ _topLinks[executorNode]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ if (executorNode != lastNodeIdx) {
+ // Merge should not be removed yet from executor, since it's pending an unwind
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[executorNode]->getActiveMerges().size());
+ CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()),
+ _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get());
+ }
+ // MergeBucketReply waiting to be sent back to node 2. NOTE: we don't have any
+ // transport context stuff set up here to perform the reply mapping, so we
+ // have to emulate it
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[executorNode]->getNumReplies());
+
+ StorageMessage::SP unwind = _topLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(executorNode), unwind->getAddress()->getIndex());
+
+ // eg: 0 -> 2 -> 1 -> 0. Or: 2 -> 1 -> 0 if no cycle
+ for (int i = (executorNode != lastNodeIdx ? _storageNodeCount - 1 : _storageNodeCount - 2); i >= 0; --i) {
+ //std::cout << "unwind " << i << "\n";
+
+ _topLinks[i]->sendDown(unwind);
+ _topLinks[i]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[i]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[i]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[i]->getActiveMerges().size());
+
+ unwind = _topLinks[i]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(i), unwind->getAddress()->getIndex());
+ }
+
+ const MergeBucketReply& mbr = dynamic_cast<const MergeBucketReply&>(*unwind);
+
+ CPPUNIT_ASSERT_EQUAL(ReturnCode::OK, mbr.getResult().getResult());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("Great success! :D-|-<"), mbr.getResult().getMessage());
+ CPPUNIT_ASSERT_EQUAL(bid, mbr.getBucketId());
+
+ } while (std::next_permutation(indices, indices + _storageNodeCount));
+
+ //std::cout << "\n" << *_topLinks[0] << "\n";
+}
+
+void
+MergeThrottlerTest::testWithSourceOnlyNode()
+{
+ BucketId bid(14, 0x1337);
+
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0);
+
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(2);
+ nodes.push_back(MergeBucketCommand::Node(1, true));
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, UINT_MAX, 123));
+
+ cmd->setAddress(address);
+ _topLinks[0]->sendDown(cmd);
+
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(1), fwd->getAddress()->getIndex());
+
+ _topLinks[1]->sendDown(fwd);
+
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(2), fwd->getAddress()->getIndex());
+
+ _topLinks[2]->sendDown(fwd);
+
+ _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ fwd = _topLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(0), fwd->getAddress()->getIndex());
+
+ _topLinks[0]->sendDown(fwd);
+ _bottomLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ _bottomLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<"));
+ _bottomLinks[0]->sendUp(reply);
+
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(0), fwd->getAddress()->getIndex());
+
+ // Assume everything's fine from here on out
+}
+
+// 4.2 distributors don't guarantee they'll send to lowest node
+// index, so we must detect such situations and execute the merge
+// immediately rather than attempt to chain it. Test that this
+// is done correctly.
+void
+MergeThrottlerTest::test42DistributorBehavior()
+{
+ BucketId bid(32, 0xfeef00);
+
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, 1234));
+
+ // Send to node 1, which is not the lowest index
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1);
+
+ cmd->setAddress(address);
+ _topLinks[1]->sendDown(cmd);
+ _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ // Should now have been sent to persistence layer
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[1]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[1]->getNumReplies()); // No reply sent yet
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[1]->getActiveMerges().size());
+
+ // Send reply up from persistence layer to simulate a completed
+ // merge operation. Merge should be removed from state.
+ _bottomLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cmd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Tonight we dine on turtle soup!"));
+ _bottomLinks[1]->sendUp(reply);
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[1]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[1]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[1]->getActiveMerges().size());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[1]->getMetrics().local.ok.getValue());
+}
+
+// Test that we don't take ownership of the merge command when we're
+// just passing it through to the persistence layer when receiving
+// a merge command that presumably comes form a 4.2 distributor
+void
+MergeThrottlerTest::test42DistributorBehaviorDoesNotTakeOwnership()
+{
+ BucketId bid(32, 0xfeef00);
+
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, 1234));
+
+ // Send to node 1, which is not the lowest index
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1);
+
+ cmd->setAddress(address);
+ _topLinks[1]->sendDown(cmd);
+ _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ // Should now have been sent to persistence layer
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[1]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[1]->getNumReplies()); // No reply sent yet
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[1]->getActiveMerges().size());
+
+ _bottomLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // To ensure we don't try to deref any non-owned messages
+ framework::HttpUrlPath path("?xml");
+ std::ostringstream ss;
+ _throttlers[1]->reportStatus(ss, path);
+
+ // Flush throttler (synchronously). Should NOT generate a reply
+ // for the merge command, as it is not owned by the throttler
+ StorageLinkTest::callOnFlush(*_throttlers[1], true);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[1]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[1]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[1]->getActiveMerges().size());
+
+ // Send a belated reply from persistence up just to ensure the
+ // throttler doesn't throw a fit if it receives an unknown merge
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cmd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Tonight we dine on turtle soup!"));
+ _bottomLinks[1]->sendUp(reply);
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[1]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[1]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[1]->getActiveMerges().size());
+}
+
+// Test that we don't take ownership of the merge command when we're
+// just passing it through to the persistence layer when we're at the
+// the end of the chain and also the designated executor
+void
+MergeThrottlerTest::testEndOfChainExecutionDoesNotTakeOwnership()
+{
+ BucketId bid(32, 0xfeef00);
+
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(2);
+ nodes.push_back(1);
+ nodes.push_back(0);
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ chain.push_back(1);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, 1234, 1, chain));
+
+ // Send to last node, which is not the lowest index
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
+
+ cmd->setAddress(address);
+ _topLinks[2]->sendDown(cmd);
+ _bottomLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ // Should now have been sent to persistence layer
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[2]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[2]->getNumReplies()); // No reply sent yet
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[2]->getActiveMerges().size());
+
+ _bottomLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // To ensure we don't try to deref any non-owned messages
+ framework::HttpUrlPath path("");
+ std::ostringstream ss;
+ _throttlers[2]->reportStatus(ss, path);
+
+ // Flush throttler (synchronously). Should NOT generate a reply
+ // for the merge command, as it is not owned by the throttler
+ StorageLinkTest::callOnFlush(*_throttlers[2], true);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[2]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[2]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[2]->getActiveMerges().size());
+
+ // Send a belated reply from persistence up just to ensure the
+ // throttler doesn't throw a fit if it receives an unknown merge
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cmd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Tonight we dine on turtle soup!"));
+ _bottomLinks[2]->sendUp(reply);
+ _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[2]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[2]->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[2]->getActiveMerges().size());
+}
+
+// Test that nodes resending a merge command won't lead to duplicate
+// state registration/forwarding or erasing the already present state
+// information.
+void
+MergeThrottlerTest::testResendHandling()
+{
+ BucketId bid(32, 0xbadbed);
+
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, 1234));
+
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1);
+
+ cmd->setAddress(address);
+ _topLinks[0]->sendDown(cmd);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // Resend from "distributor". Just use same message, as that won't matter here
+ _topLinks[0]->sendDown(cmd);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ // Reply should be BUSY
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::BUSY);
+
+ _topLinks[1]->sendDown(fwd);
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ _topLinks[2]->sendDown(fwd);
+ _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ _topLinks[2]->sendDown(fwd);
+ _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ // Reply should be BUSY
+ reply = _topLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::BUSY);
+
+ fwd = _topLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ _topLinks[0]->sendDown(fwd);
+ _bottomLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ _topLinks[0]->sendDown(fwd);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::BUSY);
+}
+
+void
+MergeThrottlerTest::testPriorityQueuing()
+{
+ // Fill up all active merges
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ CPPUNIT_ASSERT(maxPending >= 4u);
+ for (std::size_t i = 0; i < maxPending; ++i) {
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234));
+ cmd->setPriority(100);
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 0 queued
+ _topLinks[0]->waitForMessages(maxPending, 5);
+ waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime);
+
+ // Queue up some merges with different priorities
+ int priorities[4] = { 200, 150, 120, 240 };
+ int sortedPris[4] = { 120, 150, 200, 240 };
+ for (int i = 0; i < 4; ++i) {
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, i), nodes, 1234));
+ cmd->setPriority(priorities[i]);
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ waitUntilMergeQueueIs(*_throttlers[0], 4, _messageWaitTime);
+
+ // Remove all but 4 forwarded merges
+ for (std::size_t i = 0; i < maxPending - 4; ++i) {
+ _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ }
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(4), _topLinks[0]->getNumReplies());
+
+ // Now when we start replying to merges, queued merges should be
+ // processed in priority order
+ for (int i = 0; i < 4; ++i) {
+ StorageMessage::SP replyTo = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*replyTo)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "whee"));
+ _topLinks[0]->sendDown(reply);
+ }
+
+ _topLinks[0]->waitForMessages(8, _messageWaitTime); // 4 merges, 4 replies
+ waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime);
+
+ for (int i = 0; i < 4; ++i) {
+ StorageMessage::SP cmd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint8_t(sortedPris[i]), cmd->getPriority());
+ }
+}
+
+// Test that we can detect and reject merges that due to resending
+// and potential priority queue sneaking etc may end up with duplicates
+// in the queue for a merge that is already known.
+void
+MergeThrottlerTest::testCommandInQueueDuplicateOfKnownMerge()
+{
+ // Fill up all active merges and 1 queued one
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ for (std::size_t i = 0; i < maxPending + 1; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(2 + i);
+ nodes.push_back(5 + i);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234));
+ cmd->setPriority(100 - i);
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 3 queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime);
+
+ // Add a merge for the same bucket twice to the queue
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(12);
+ nodes.push_back(123);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf000feee), nodes, 1234));
+ _topLinks[0]->sendDown(cmd);
+ }
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(124); // Different node set doesn't matter
+ nodes.push_back(14);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf000feee), nodes, 1234));
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
+
+ StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // Remove and success-reply for 2 merges. This will give enough room
+ // for the 2 first queued merges to be processed, the last one having a
+ // duplicate in the queue.
+ for (int i = 0; i < 2; ++i) {
+ StorageMessage::SP fwd2 = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd2)));
+ reply->setResult(ReturnCode(ReturnCode::OK, ""));
+ _topLinks[0]->sendDown(reply);
+ }
+
+ _topLinks[0]->waitForMessages(maxPending + 1, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime);
+
+ // Remove all current merge commands/replies so we can work with a clean slate
+ _topLinks[0]->getRepliesOnce();
+ // Send a success-reply for fwd, allowing the duplicate from the queue
+ // to have its moment to shine only to then be struck down mercilessly
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, ""));
+ _topLinks[0]->sendDown(reply);
+
+ _topLinks[0]->waitForMessages(2, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime);
+
+ // First reply is the successful merge reply
+ StorageMessage::SP reply2 = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply2).getResult().getResult(),
+ ReturnCode::OK);
+
+ // Second reply should be the BUSY-rejected duplicate
+ StorageMessage::SP reply1 = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply1).getResult().getResult(),
+ ReturnCode::BUSY);
+ CPPUNIT_ASSERT(static_cast<MergeBucketReply&>(*reply1).getResult()
+ .getMessage().find("out of date;") != std::string::npos);
+}
+
+// Test that sending a merge command to a node not in the set of
+// to-be-merged nodes is handled gracefully.
+// This is not a scenario that should ever actually happen, but for
+// the sake of robustness, include it anyway.
+void
+MergeThrottlerTest::testInvalidReceiverNode()
+{
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(1);
+ nodes.push_back(5);
+ nodes.push_back(9);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baaaa), nodes, 1234));
+
+ // Send to node with index 0
+ _topLinks[0]->sendDown(cmd);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::REJECTED);
+ CPPUNIT_ASSERT(static_cast<MergeBucketReply&>(*reply).getResult()
+ .getMessage().find("which is not in its forwarding chain") != std::string::npos);
+}
+
+// Test that the throttling policy kicks in after a certain number of
+// merges are forwarded and that the rest are queued in a prioritized
+// order.
+void
+MergeThrottlerTest::testForwardQueuedMerge()
+{
+ // Fill up all active merges and then 3 queued ones
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ for (std::size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(2 + i);
+ nodes.push_back(5 + i);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234));
+ cmd->setPriority(100 - i);
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 3 queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
+
+ // Merge queue state should not be touched by worker thread now
+ StorageMessage::SP nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg;
+
+ StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // Remove all the rest of the active merges
+ while (!_topLinks[0]->getReplies().empty()) {
+ _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ }
+
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Celebrate good times come on"));
+ _topLinks[0]->sendDown(reply);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); // Success rewind reply
+
+ // Remove reply bound for distributor
+ StorageMessage::SP distReply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*distReply).getResult().getResult(),
+ ReturnCode::OK);
+
+ waitUntilMergeQueueIs(*_throttlers[0], 2, _messageWaitTime);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[0]->getNumReplies());
+
+ // First queued merge should now have been registered and forwarded
+ fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<const MergeBucketCommand&>(*fwd).getBucketId(),
+ static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+
+ CPPUNIT_ASSERT(
+ static_cast<const MergeBucketCommand&>(*fwd).getNodes()
+ == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes());
+
+ // Ensure forwarded merge has a higher priority than the next queued one
+ CPPUNIT_ASSERT(fwd->getPriority() < _throttlers[0]->getMergeQueue().begin()->_msg->getPriority());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue());
+
+ /*framework::HttpUrlPath path("?xml");
+ _forwarders[0]->reportStatus(std::cerr, path);*/
+}
+
+void
+MergeThrottlerTest::testExecuteQueuedMerge()
+{
+ MergeThrottler& throttler(*_throttlers[1]);
+ DummyStorageLink& topLink(*_topLinks[1]);
+ DummyStorageLink& bottomLink(*_bottomLinks[1]);
+
+ // Fill up all active merges and then 3 queued ones
+ std::size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ for (std::size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(1);
+ nodes.push_back(5 + i);
+ nodes.push_back(7 + i);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1));
+ cmd->setPriority(250 - i + 5);
+ topLink.sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 3 queued
+ topLink.waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(throttler, 3, _messageWaitTime);
+
+ // Sneak in a higher priority message that is bound to be executed
+ // on the given node
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(1);
+ nodes.push_back(0);
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0x1337), nodes, 1234, 1, chain));
+ cmd->setPriority(0);
+ topLink.sendDown(cmd);
+ }
+
+ waitUntilMergeQueueIs(throttler, 4, _messageWaitTime);
+
+ // Merge queue state should not be touched by worker thread now
+ StorageMessage::SP nextMerge(throttler.getMergeQueue().begin()->_msg);
+ /*StorageMessage::SP nextMerge;
+ {
+ vespalib::LockGuard lock(_throttlers[0]->getStateLock());
+ // Dirty: have to check internal state
+ nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg;
+ }*/
+
+ CPPUNIT_ASSERT_EQUAL(
+ BucketId(32, 0x1337),
+ dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+
+ StorageMessage::SP fwd(topLink.getAndRemoveMessage(MessageType::MERGEBUCKET));
+
+ // Remove all the rest of the active merges
+ while (!topLink.getReplies().empty()) {
+ topLink.getAndRemoveMessage(MessageType::MERGEBUCKET);
+ }
+
+ // Free up a merge slot
+ std::shared_ptr<MergeBucketReply> reply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd)));
+ reply->setResult(ReturnCode(ReturnCode::OK, "Celebrate good times come on"));
+ topLink.sendDown(reply);
+
+ topLink.waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ // Remove chain reply
+ StorageMessage::SP distReply(topLink.getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY));
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*distReply).getResult().getResult(),
+ ReturnCode::OK);
+
+ waitUntilMergeQueueIs(throttler, 3, _messageWaitTime);
+ bottomLink.waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), topLink.getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), topLink.getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(1), bottomLink.getNumCommands());
+
+ // First queued merge should now have been registered and sent down
+ StorageMessage::SP cmd(bottomLink.getAndRemoveMessage(MessageType::MERGEBUCKET));
+
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<const MergeBucketCommand&>(*cmd).getBucketId(),
+ static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+
+ CPPUNIT_ASSERT(
+ static_cast<const MergeBucketCommand&>(*cmd).getNodes()
+ == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes());
+}
+
+void
+MergeThrottlerTest::testFlush()
+{
+ // Fill up all active merges and then 3 queued ones
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ for (std::size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1));
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 3 queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
+
+ // Remove all forwarded commands
+ uint32_t removed = _topLinks[0]->getRepliesOnce().size();
+ CPPUNIT_ASSERT(removed >= 5);
+
+ // Flush the storage link, triggering an abort of all commands
+ // no matter what their current state is.
+ _topLinks[0]->close();
+ _topLinks[0]->flush();
+ _topLinks[0]->waitForMessages(maxPending + 3 - removed, _messageWaitTime);
+
+ while (!_topLinks[0]->getReplies().empty()) {
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ ReturnCode::ABORTED,
+ static_cast<const MergeBucketReply&>(*reply).getResult().getResult());
+ }
+ // NOTE: merges that have been immediately executed (i.e. not cycled)
+ // on the node should _not_ be replied to, since they're not owned
+ // by the throttler at that point in time
+}
+
+// If a node goes down and another node has a merge chained through it in
+// its queue, the original node can receive a final chain hop forwarding
+// it knows nothing about when it comes back up. If this is not handled
+// properly, it will attempt to forward this node again with a bogus
+// index. This should be implicitly handled by checking for a full node
+void
+MergeThrottlerTest::testUnseenMergeWithNodeInChain()
+{
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(5);
+ nodes.push_back(9);
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ chain.push_back(5);
+ chain.push_back(9);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xdeadbeef), nodes, 1234, 1, chain));
+
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 9);
+
+ cmd->setAddress(address);
+ _topLinks[0]->sendDown(cmd);
+
+ // First, test that we get rejected when processing merge immediately
+ // Should get a rejection in return
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ ReturnCode::REJECTED,
+ dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
+
+ // Second, test that we get rejected before queueing up. This is to
+ // avoid a hypothetical deadlock scenario.
+ // Fill up all active merges
+ {
+
+ std::size_t maxPending(
+ _throttlers[0]->getThrottlePolicy().getMaxPendingCount());
+ for (std::size_t i = 0; i < maxPending; ++i) {
+ std::shared_ptr<MergeBucketCommand> fillCmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234));
+ _topLinks[0]->sendDown(fillCmd);
+ }
+ }
+
+ _topLinks[0]->sendDown(cmd);
+
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ ReturnCode::REJECTED,
+ dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
+}
+
+void
+MergeThrottlerTest::testMergeWithNewerClusterStateFlushesOutdatedQueued()
+{
+ // Fill up all active merges and then 3 queued ones with the same
+ // system state
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ std::vector<api::StorageMessage::Id> ids;
+ for (std::size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1));
+ ids.push_back(cmd->getMsgId());
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 3 queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
+
+ // Send down merge with newer system state
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0x12345678), nodes, 1234, 2));
+ ids.push_back(cmd->getMsgId());
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Queue should now be flushed with all messages being returned with
+ // WRONG_DISTRIBUTION
+ _topLinks[0]->waitForMessages(maxPending + 3, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime);
+
+ for (int i = 0; i < 3; ++i) {
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::WRONG_DISTRIBUTION);
+ CPPUNIT_ASSERT_EQUAL(1u, static_cast<MergeBucketReply&>(*reply).getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(ids[maxPending + i], reply->getMsgId());
+ }
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(3), _throttlers[0]->getMetrics().chaining.failures.wrongdistribution.getValue());
+}
+
+void
+MergeThrottlerTest::testUpdatedClusterStateFlushesOutdatedQueued()
+{
+ // State is version 1. Send down several merges with state version 2.
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ std::vector<api::StorageMessage::Id> ids;
+ for (std::size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 2));
+ ids.push_back(cmd->getMsgId());
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 4 queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
+
+ // Send down new system state (also set it explicitly)
+ _servers[0]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:3"));
+ std::shared_ptr<api::SetSystemStateCommand> stateCmd(
+ new api::SetSystemStateCommand(lib::ClusterState("distributor:100 storage:100 version:3")));
+ _topLinks[0]->sendDown(stateCmd);
+
+ // Queue should now be flushed with all being replied to with WRONG_DISTRIBUTION
+ waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime);
+ _topLinks[0]->waitForMessages(maxPending + 3, 5);
+
+ for (int i = 0; i < 3; ++i) {
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::WRONG_DISTRIBUTION);
+ CPPUNIT_ASSERT_EQUAL(2u, static_cast<MergeBucketReply&>(*reply).getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(ids[maxPending + i], reply->getMsgId());
+ }
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(3), _throttlers[0]->getMetrics().chaining.failures.wrongdistribution.getValue());
+}
+
+void
+MergeThrottlerTest::test42MergesDoNotTriggerFlush()
+{
+ // Fill up all active merges and then 1 queued one
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ CPPUNIT_ASSERT(maxPending < 100);
+ for (std::size_t i = 0; i < maxPending + 1; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1));
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and 1 queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime);
+
+ StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // Remove all the rest of the active merges
+ while (!_topLinks[0]->getReplies().empty()) {
+ _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ }
+
+ // Send down a merge with a cluster state version of 0, which should
+ // be ignored and queued as usual
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xbaaadbed), nodes, 1234, 0));
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ waitUntilMergeQueueIs(*_throttlers[0], 2, _messageWaitTime);
+
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumReplies());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.wrongdistribution.getValue());
+}
+
+// Test that a merge that arrive with a state version that is less than
+// that of the node is rejected immediately
+void
+MergeThrottlerTest::testOutdatedClusterStateMergesAreRejectedOnArrival()
+{
+ _servers[0]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:10"));
+
+ // Send down a merge with a cluster state version of 9, which should
+ // be rejected
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 9));
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ _topLinks[0]->waitForMessages(1, _messageWaitTime);
+
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult(),
+ ReturnCode::WRONG_DISTRIBUTION);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().chaining.failures.wrongdistribution.getValue());
+}
+
+// Test erroneous case where node receives merge where the merge does
+// not exist in the state, but it exists in the chain without the chain
+// being full. This is something that shouldn't happen, but must still
+// not crash the node
+void
+MergeThrottlerTest::testUnknownMergeWithSelfInChain()
+{
+ BucketId bid(32, 0xbadbed);
+
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(bid, nodes, 1234, 1, chain));
+
+ StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1);
+
+ cmd->setAddress(address);
+ _topLinks[0]->sendDown(cmd);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+
+ CPPUNIT_ASSERT_EQUAL(
+ ReturnCode::REJECTED,
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult());
+}
+
+void
+MergeThrottlerTest::testBusyReturnedOnFullQueue()
+{
+ std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ std::size_t maxQueue = _throttlers[0]->getMaxQueueSize();
+ CPPUNIT_ASSERT(maxPending < 100);
+ for (std::size_t i = 0; i < maxPending + maxQueue; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf00000 + i), nodes, 1234, 1));
+ _topLinks[0]->sendDown(cmd);
+ }
+
+ // Wait till we have maxPending replies and maxQueue queued
+ _topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
+ waitUntilMergeQueueIs(*_throttlers[0], maxQueue, _messageWaitTime);
+
+ // Clear all forwarded merges
+ _topLinks[0]->getRepliesOnce();
+ // Send down another merge which should be immediately busy-returned
+ {
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(0);
+ nodes.push_back(1);
+ nodes.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xf000baaa), nodes, 1234, 1));
+ _topLinks[0]->sendDown(cmd);
+ }
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+
+ CPPUNIT_ASSERT_EQUAL(
+ BucketId(32, 0xf000baaa),
+ static_cast<MergeBucketReply&>(*reply).getBucketId());
+
+ CPPUNIT_ASSERT_EQUAL(
+ ReturnCode::BUSY,
+ static_cast<MergeBucketReply&>(*reply).getResult().getResult());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0),
+ _throttlers[0]->getMetrics().chaining
+ .failures.busy.getValue());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1),
+ _throttlers[0]->getMetrics().local
+ .failures.busy.getValue());
+}
+
+void
+MergeThrottlerTest::testBrokenCycle()
+{
+ std::vector<MergeBucketCommand::Node> nodes;
+ nodes.push_back(1);
+ nodes.push_back(0);
+ nodes.push_back(2);
+ {
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 1, chain));
+ _topLinks[1]->sendDown(cmd);
+ }
+
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ StorageMessage::SP fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(2), fwd->getAddress()->getIndex());
+
+ // Send cycled merge which will be executed
+ {
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ chain.push_back(1);
+ chain.push_back(2);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 1, chain));
+ _topLinks[1]->sendDown(cmd);
+ }
+
+ _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ StorageMessage::SP cycled = _bottomLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+
+ // Now, node 2 goes down, auto sending back a failed merge
+ std::shared_ptr<MergeBucketReply> nodeDownReply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd)));
+ nodeDownReply->setResult(ReturnCode(ReturnCode::NOT_CONNECTED, "Node went sightseeing"));
+
+ _topLinks[1]->sendDown(nodeDownReply);
+ // Merge reply also arrives from persistence
+ std::shared_ptr<MergeBucketReply> persistenceReply(
+ new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cycled)));
+ persistenceReply->setResult(ReturnCode(ReturnCode::ABORTED, "Oh dear"));
+ _bottomLinks[1]->sendUp(persistenceReply);
+
+ // Should now be two replies from node 1, one to node 2 and one to node 0
+ // since we must handle broken chains
+ _topLinks[1]->waitForMessages(2, _messageWaitTime);
+ // Unwind reply shares the result of the persistence reply
+ for (int i = 0; i < 2; ++i) {
+ StorageMessage::SP reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(ReturnCode::ABORTED, "Oh dear"),
+ static_cast<MergeBucketReply&>(*reply).getResult());
+ }
+
+ // Make sure it has been removed from the internal state so we can
+ // send new merges for the bucket
+ {
+ std::vector<uint16_t> chain;
+ chain.push_back(0);
+ std::shared_ptr<MergeBucketCommand> cmd(
+ new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 1, chain));
+ _topLinks[1]->sendDown(cmd);
+ }
+
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, 5);
+ fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+ CPPUNIT_ASSERT_EQUAL(uint16_t(2), fwd->getAddress()->getIndex());
+}
+
+void
+MergeThrottlerTest::sendAndExpectReply(
+ const std::shared_ptr<api::StorageMessage>& msg,
+ const api::MessageType& expectedReplyType,
+ api::ReturnCode::Result expectedResultCode)
+{
+ _topLinks[0]->sendDown(msg);
+ _topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime);
+ StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage(
+ expectedReplyType));
+ api::StorageReply& storageReply(
+ dynamic_cast<api::StorageReply&>(*reply));
+ CPPUNIT_ASSERT_EQUAL(expectedResultCode,
+ storageReply.getResult().getResult());
+}
+
+void
+MergeThrottlerTest::testGetBucketDiffCommandNotInActiveSetIsRejected()
+{
+ document::BucketId bucket(16, 1234);
+ std::vector<api::GetBucketDiffCommand::Node> nodes;
+ std::shared_ptr<api::GetBucketDiffCommand> getDiffCmd(
+ new api::GetBucketDiffCommand(bucket, nodes, api::Timestamp(1234)));
+
+ sendAndExpectReply(getDiffCmd,
+ api::MessageType::GETBUCKETDIFF_REPLY,
+ api::ReturnCode::ABORTED);
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[0]->getNumCommands());
+}
+
+void
+MergeThrottlerTest::testApplyBucketDiffCommandNotInActiveSetIsRejected()
+{
+ document::BucketId bucket(16, 1234);
+ std::vector<api::GetBucketDiffCommand::Node> nodes;
+ std::shared_ptr<api::ApplyBucketDiffCommand> applyDiffCmd(
+ new api::ApplyBucketDiffCommand(bucket, nodes, api::Timestamp(1234)));
+
+ sendAndExpectReply(applyDiffCmd,
+ api::MessageType::APPLYBUCKETDIFF_REPLY,
+ api::ReturnCode::ABORTED);
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[0]->getNumCommands());
+}
+
+api::MergeBucketCommand::SP
+MergeThrottlerTest::sendMerge(const MergeBuilder& builder)
+{
+ api::MergeBucketCommand::SP cmd(builder.create());
+ _topLinks[builder._nodes[0]]->sendDown(cmd);
+ return cmd;
+}
+
+void
+MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges()
+{
+ document::BucketId bucket(16, 6789);
+ _throttlers[0]->getThrottlePolicy().setMaxPendingCount(1);
+
+ // Merge will be forwarded (i.e. active).
+ sendMerge(MergeBuilder(bucket).clusterStateVersion(10));
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ StorageMessage::SP fwd(_topLinks[0]->getAndRemoveMessage(
+ MessageType::MERGEBUCKET));
+
+ _topLinks[0]->sendDown(makeSystemStateCmd(
+ "version:11 distributor:100 storage:100"));
+ // Cannot send reply until we're unwinding
+ CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumReplies());
+
+ // Trying to diff the bucket should now fail
+ {
+ std::shared_ptr<api::GetBucketDiffCommand> getDiffCmd(
+ new api::GetBucketDiffCommand(bucket, {}, api::Timestamp(123)));
+
+ sendAndExpectReply(getDiffCmd,
+ api::MessageType::GETBUCKETDIFF_REPLY,
+ api::ReturnCode::ABORTED);
+ }
+}
+
+// TODO test message queue aborting (use rendezvous functionality--make guard)
+
+} // namespace storage
diff --git a/storage/src/tests/storageserver/priorityconvertertest.cpp b/storage/src/tests/storageserver/priorityconvertertest.cpp
new file mode 100644
index 00000000000..ecbdcfb6b91
--- /dev/null
+++ b/storage/src/tests/storageserver/priorityconvertertest.cpp
@@ -0,0 +1,104 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/storage/storageserver/priorityconverter.h>
+#include <tests/common/testhelper.h>
+
+namespace storage {
+
+struct PriorityConverterTest : public CppUnit::TestFixture
+{
+ std::unique_ptr<PriorityConverter> _converter;
+
+ void setUp() {
+ vdstestlib::DirConfig config(getStandardConfig(true));
+ _converter.reset(new PriorityConverter(config.getConfigId()));
+ };
+
+ void testNormalUsage();
+ void testLowestPriorityIsReturnedForUnknownCode();
+
+ CPPUNIT_TEST_SUITE(PriorityConverterTest);
+ CPPUNIT_TEST(testNormalUsage);
+ CPPUNIT_TEST(testLowestPriorityIsReturnedForUnknownCode);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(PriorityConverterTest);
+
+void PriorityConverterTest::testNormalUsage()
+{
+ for (int p=0; p<16; ++p) {
+ CPPUNIT_ASSERT_EQUAL(
+ (uint8_t)(50+p*10),
+ _converter->toStoragePriority(
+ static_cast<documentapi::Priority::Value>(p)));
+ }
+ for (int i=0; i<256; ++i) {
+ uint8_t p = i;
+ if (p <= 50) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGHEST,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 60) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_VERY_HIGH,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 70) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGH_1,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 80) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGH_2,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 90) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGH_3,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 100) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_1,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 110) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_2,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 120) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_3,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 130) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_4,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 140) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_5,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 150) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_6,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 160) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOW_1,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 170) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOW_2,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 180) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOW_3,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 190) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_VERY_LOW,
+ _converter->toDocumentPriority(p));
+ } else if (p <= 200) {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOWEST,
+ _converter->toDocumentPriority(p));
+ } else {
+ CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOWEST,
+ _converter->toDocumentPriority(p));
+ }
+ }
+}
+
+
+void
+PriorityConverterTest::testLowestPriorityIsReturnedForUnknownCode()
+{
+ CPPUNIT_ASSERT_EQUAL(255,
+ static_cast<int>(_converter->toStoragePriority(
+ static_cast<documentapi::Priority::Value>(123))));
+}
+
+}
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
new file mode 100644
index 00000000000..68a35ac37d9
--- /dev/null
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -0,0 +1,264 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <boost/pointer_cast.hpp>
+#include <cppunit/extensions/HelperMacros.h>
+#include <iostream>
+#include <vespa/metrics/metricmanager.h>
+#include <string>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageapi/message/state.h>
+#include <vespa/vdslib/state/nodestate.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/storage/storageserver/statemanager.h>
+#include <vespa/storage/common/hostreporter/hostinfo.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/dummystoragelink.h>
+#include <vespa/vespalib/data/slime/type.h>
+
+using storage::lib::NodeState;
+using storage::lib::NodeType;
+using storage::lib::State;
+using storage::lib::ClusterState;
+
+namespace storage {
+
+struct StateManagerTest : public CppUnit::TestFixture {
+ std::unique_ptr<TestServiceLayerApp> _node;
+ std::unique_ptr<DummyStorageLink> _upper;
+ std::unique_ptr<metrics::MetricManager> _metricManager;
+ StateManager* _manager;
+ DummyStorageLink* _lower;
+
+ StateManagerTest();
+
+ void setUp();
+ void tearDown();
+
+ void testSystemState();
+ void testReportedNodeState();
+ void testClusterStateVersion();
+
+ CPPUNIT_TEST_SUITE(StateManagerTest);
+ CPPUNIT_TEST(testSystemState);
+ CPPUNIT_TEST(testReportedNodeState);
+ CPPUNIT_TEST(testClusterStateVersion);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(StateManagerTest);
+
+StateManagerTest::StateManagerTest()
+ : _node(),
+ _upper(),
+ _manager(0),
+ _lower(0)
+{
+}
+
+void
+StateManagerTest::setUp() {
+ try{
+ vdstestlib::DirConfig config(getStandardConfig(true));
+ _node.reset(new TestServiceLayerApp(DiskCount(1), NodeIndex(2)));
+ // Clock will increase 1 sec per call.
+ _node->getClock().setAbsoluteTimeInSeconds(1);
+ _metricManager.reset(new metrics::MetricManager);
+ _upper.reset(new DummyStorageLink());
+ _manager = new StateManager(_node->getComponentRegister(),
+ *_metricManager,
+ std::unique_ptr<HostInfo>(new HostInfo));
+ _lower = new DummyStorageLink();
+ _upper->push_back(StorageLink::UP(_manager));
+ _upper->push_back(StorageLink::UP(_lower));
+ _upper->open();
+ } catch (std::exception& e) {
+ std::cerr << "Failed to static initialize objects: " << e.what()
+ << "\n";
+ }
+}
+
+void
+StateManagerTest::tearDown() {
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumCommands());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumCommands());
+ _manager = 0;
+ _lower = 0;
+ _upper->close();
+ _upper->flush();
+ _upper.reset(0);
+ _node.reset(0);
+ _metricManager.reset();
+}
+
+#define GET_ONLY_OK_REPLY(varname) \
+{ \
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \
+ CPPUNIT_ASSERT(_upper->getReply(0)->getType().isReply()); \
+ varname = std::dynamic_pointer_cast<api::StorageReply>( \
+ _upper->getReply(0)); \
+ CPPUNIT_ASSERT(varname != 0); \
+ _upper->reset(); \
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), \
+ varname->getResult()); \
+}
+
+void
+StateManagerTest::testSystemState()
+{
+ std::shared_ptr<api::StorageReply> reply;
+ // Verify initial state on startup
+ ClusterState::CSP currentState = _manager->getSystemState();
+ CPPUNIT_ASSERT_EQUAL(std::string("cluster:d"),
+ currentState->toString(false));
+
+ NodeState::CSP currentNodeState = _manager->getCurrentNodeState();
+ CPPUNIT_ASSERT_EQUAL(std::string("s:d"), currentNodeState->toString(false));
+
+ ClusterState sendState("storage:4 .2.s:m");
+ std::shared_ptr<api::SetSystemStateCommand> cmd(
+ new api::SetSystemStateCommand(sendState));
+ _upper->sendDown(cmd);
+ GET_ONLY_OK_REPLY(reply);
+
+ currentState = _manager->getSystemState();
+ CPPUNIT_ASSERT_EQUAL(sendState, *currentState);
+
+ currentNodeState = _manager->getCurrentNodeState();
+ CPPUNIT_ASSERT_EQUAL(std::string("s:m"), currentNodeState->toString(false));
+}
+
+namespace {
+ struct MyStateListener : public StateListener {
+ const NodeStateUpdater& updater;
+ lib::NodeState current;
+ std::ostringstream ost;
+
+ MyStateListener(const NodeStateUpdater& upd)
+ : updater(upd), current(*updater.getReportedNodeState()) {}
+
+ void handleNewState()
+ {
+ ost << current << " -> ";
+ current = *updater.getReportedNodeState();
+ ost << current << "\n";
+ }
+ };
+}
+
+void
+StateManagerTest::testReportedNodeState()
+{
+ std::shared_ptr<api::StorageReply> reply;
+ // Add a state listener to check that we get events.
+ MyStateListener stateListener(*_manager);
+ _manager->addStateListener(stateListener);
+ // Test that initial state is initializing
+ NodeState::CSP nodeState = _manager->getReportedNodeState();
+ CPPUNIT_ASSERT_EQUAL(std::string("s:i b:58 i:0 t:1"), nodeState->toString(false));
+ // Test that it works to update the state
+ {
+ NodeStateUpdater::Lock::SP lock(_manager->grabStateChangeLock());
+ NodeState ns(*_manager->getReportedNodeState());
+ ns.setState(State::UP);
+ _manager->setReportedNodeState(ns);
+ }
+ // And that we get the change both through state interface
+ nodeState = _manager->getReportedNodeState();
+ CPPUNIT_ASSERT_EQUAL(std::string("s:u b:58 t:1"),
+ nodeState->toString(false));
+ // And get node state command (no expected state)
+ std::shared_ptr<api::GetNodeStateCommand> cmd(
+ new api::GetNodeStateCommand(lib::NodeState::UP()));
+ _upper->sendDown(cmd);
+ GET_ONLY_OK_REPLY(reply);
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY,
+ reply->getType());
+ nodeState.reset(new NodeState(
+ dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()));
+ CPPUNIT_ASSERT_EQUAL(std::string("s:u b:58 t:1"),
+ nodeState->toString(false));
+ // We should also get it with wrong expected state
+ cmd.reset(new api::GetNodeStateCommand(lib::NodeState::UP(new NodeState(NodeType::STORAGE, State::INITIALIZING))));
+ _upper->sendDown(cmd);
+ GET_ONLY_OK_REPLY(reply);
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY,
+ reply->getType());
+ nodeState.reset(new NodeState(
+ dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()));
+ CPPUNIT_ASSERT_EQUAL(std::string("s:u b:58 t:1"),
+ nodeState->toString(false));
+ // With correct wanted state we should not get response right away
+ cmd.reset(new api::GetNodeStateCommand(
+ lib::NodeState::UP(new NodeState("s:u b:58 t:1", &NodeType::STORAGE))));
+ _upper->sendDown(cmd);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
+ // But when we update state, we get the reply
+ {
+ NodeStateUpdater::Lock::SP lock(_manager->grabStateChangeLock());
+ NodeState ns(*_manager->getReportedNodeState());
+ ns.setState(State::STOPPING);
+ ns.setDescription("Stopping node");
+ _manager->setReportedNodeState(ns);
+ }
+
+ GET_ONLY_OK_REPLY(reply);
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY,
+ reply->getType());
+ nodeState.reset(new NodeState(
+ dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()));
+ CPPUNIT_ASSERT_EQUAL(std::string("s:s b:58 t:1 m:Stopping\\x20node"),
+ nodeState->toString(false));
+
+ // Removing state listener, it stops getting updates
+ _manager->removeStateListener(stateListener);
+ // Do another update which listener should not get..
+ {
+ NodeStateUpdater::Lock::SP lock(_manager->grabStateChangeLock());
+ NodeState ns(*_manager->getReportedNodeState());
+ ns.setState(State::UP);
+ _manager->setReportedNodeState(ns);
+ }
+ std::string expectedEvents =
+ "s:i b:58 i:0 t:1 -> s:u b:58 t:1\n"
+ "s:u b:58 t:1 -> s:s b:58 t:1 m:Stopping\\x20node\n";
+ CPPUNIT_ASSERT_EQUAL(expectedEvents, stateListener.ost.str());
+}
+
+void
+StateManagerTest::testClusterStateVersion()
+{
+ ClusterState state(*_manager->getSystemState());
+ state.setVersion(123);
+ _manager->setClusterState(state);
+
+ std::string nodeInfoString(_manager->getNodeInfo());
+ vespalib::slime::Memory goldenMemory(nodeInfoString);
+ vespalib::Slime nodeInfo;
+ vespalib::slime::JsonFormat::decode(nodeInfoString, nodeInfo);
+
+ vespalib::slime::Symbol lookupSymbol =
+ nodeInfo.lookup("cluster-state-version");
+ if (lookupSymbol.undefined()) {
+ CPPUNIT_FAIL("No cluster-state-version was found in the node info");
+ }
+
+ auto& cursor = nodeInfo.get();
+ auto& clusterStateVersionCursor = cursor["cluster-state-version"];
+ if (!clusterStateVersionCursor.valid()) {
+ CPPUNIT_FAIL("No cluster-state-version was found in the node info");
+ }
+
+ if (clusterStateVersionCursor.type().getId() != vespalib::slime::LONG::ID) {
+ CPPUNIT_FAIL("No cluster-state-version was found in the node info");
+ }
+
+ int version = clusterStateVersionCursor.asLong();
+ CPPUNIT_ASSERT_EQUAL(123, version);
+}
+
+} // storage
+
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
new file mode 100644
index 00000000000..ef1592bce80
--- /dev/null
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -0,0 +1,279 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <vespa/log/log.h>
+#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/storageserver/applicationgenerationfetcher.h>
+#include <vespa/storage/storageserver/statereporter.h>
+#include <tests/common/teststorageapp.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/dummystoragelink.h>
+
+LOG_SETUP(".test.statereporter");
+
+namespace storage {
+
+class DummyApplicationGenerationFether : public ApplicationGenerationFetcher {
+public:
+ virtual int64_t getGeneration() const { return 1; }
+ virtual std::string getComponentName() const { return "component"; }
+};
+
+struct StateReporterTest : public CppUnit::TestFixture {
+ FastOS_ThreadPool _threadPool;
+ framework::defaultimplementation::FakeClock* _clock;
+ std::unique_ptr<TestServiceLayerApp> _node;
+ std::unique_ptr<DummyStorageLink> _top;
+ DummyApplicationGenerationFether _generationFetcher;
+ std::unique_ptr<StateReporter> _stateReporter;
+ std::unique_ptr<vdstestlib::DirConfig> _config;
+ std::unique_ptr<metrics::MetricSet> _topSet;
+ std::unique_ptr<metrics::MetricManager> _metricManager;
+ std::shared_ptr<FileStorMetrics> _filestorMetrics;
+
+ StateReporterTest();
+
+ void setUp();
+ void tearDown();
+ void runLoad(uint32_t count = 1);
+
+ void testReportConfigGeneration();
+ void testReportHealth();
+ void testReportMetrics();
+
+ CPPUNIT_TEST_SUITE(StateReporterTest);
+ CPPUNIT_TEST(testReportConfigGeneration);
+ CPPUNIT_TEST(testReportHealth);
+ CPPUNIT_TEST(testReportMetrics);
+ CPPUNIT_TEST_SUITE_END();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(StateReporterTest);
+
+namespace {
+ struct MetricClock : public metrics::MetricManager::Timer
+ {
+ framework::Clock& _clock;
+ MetricClock(framework::Clock& c) : _clock(c) {}
+ virtual time_t getTime() const
+ { return _clock.getTimeInSeconds().getTime(); }
+ virtual time_t getTimeInMilliSecs() const
+ { return _clock.getTimeInMillis().getTime(); }
+ };
+}
+
+StateReporterTest::StateReporterTest()
+ : _threadPool(256*1024),
+ _clock(0),
+ _top(),
+ _stateReporter()
+{
+}
+
+void StateReporterTest::setUp() {
+ assert(system("rm -rf vdsroot") == 0);
+ _config.reset(new vdstestlib::DirConfig(getStandardConfig(true)));
+ try {
+ _node.reset(new TestServiceLayerApp(DiskCount(4), NodeIndex(0),
+ _config->getConfigId()));
+ _node->setupDummyPersistence();
+ _clock = &_node->getClock();
+ _clock->setAbsoluteTimeInSeconds(1000000);
+ _top.reset(new DummyStorageLink);
+ } catch (config::InvalidConfigException& e) {
+ fprintf(stderr, "%s\n", e.what());
+ }
+ _metricManager.reset(new metrics::MetricManager(
+ std::unique_ptr<metrics::MetricManager::Timer>(
+ new MetricClock(*_clock))));
+ _topSet.reset(new metrics::MetricSet("vds", "", ""));
+ {
+ metrics::MetricLockGuard guard(_metricManager->getMetricLock());
+ _metricManager->registerMetric(guard, *_topSet);
+ }
+
+ _stateReporter.reset(new StateReporter(
+ _node->getComponentRegister(),
+ *_metricManager,
+ _generationFetcher,
+ "status"));
+
+ uint16_t diskCount = _node->getPartitions().size();
+ documentapi::LoadTypeSet::SP loadTypes(_node->getLoadTypes());
+
+ _filestorMetrics.reset(new FileStorMetrics(
+ _node->getLoadTypes()->getMetricLoadTypes()));
+ _filestorMetrics->initDiskMetrics(
+ diskCount, loadTypes->getMetricLoadTypes(), 1);
+ _topSet->registerMetric(*_filestorMetrics);
+
+ _metricManager->init(_config->getConfigId(), _node->getThreadPool());
+}
+
+void StateReporterTest::tearDown() {
+ _metricManager->stop();
+ _stateReporter.reset(0);
+ _topSet.reset(0);
+ _metricManager.reset(0);
+ _top.reset(0);
+ _node.reset(0);
+ _config.reset(0);
+ _filestorMetrics.reset();
+}
+
+#define PARSE_JSON(jsonData) \
+vespalib::Slime slime; \
+{ \
+ using namespace vespalib::slime; \
+ size_t parsed = JsonFormat::decode(Memory(jsonData), slime); \
+ SimpleBuffer buffer; \
+ JsonFormat::encode(slime, buffer, false); \
+ if (jsonData.size() != parsed) { \
+ std::ostringstream error; \
+ error << "Failed to parse JSON: '\n" \
+ << jsonData << "'\n:" << buffer.get().make_string() << "\n"; \
+ CPPUNIT_ASSERT_EQUAL_MSG(error.str(), jsonData.size(), parsed); \
+ } \
+}
+
+#define ASSERT_GENERATION(jsonData, component, generation) \
+{ \
+ PARSE_JSON(jsonData); \
+ CPPUNIT_ASSERT_EQUAL( \
+ generation, \
+ slime.get()["config"][component]["generation"].asDouble()); \
+}
+
+#define ASSERT_NODE_STATUS(jsonData, code, message) \
+{ \
+ PARSE_JSON(jsonData); \
+ CPPUNIT_ASSERT_EQUAL( \
+ vespalib::string(code), \
+ slime.get()["status"]["code"].asString().make_string()); \
+ CPPUNIT_ASSERT_EQUAL( \
+ vespalib::string(message), \
+ slime.get()["status"]["message"].asString().make_string()); \
+}
+
+#define ASSERT_METRIC_GET_PUT(jsonData, expGetCount, expPutCount) \
+{ \
+ PARSE_JSON(jsonData); \
+ double getCount = -1; \
+ double putCount = -1; \
+ size_t metricCount = slime.get()["metrics"]["values"].children(); \
+ /*std::cerr << "\nmetric count=" << metricCount << "\n";*/ \
+ for (size_t j=0; j<metricCount; j++) { \
+ const vespalib::string name = slime.get()["metrics"]["values"][j]["name"] \
+ .asString().make_string(); \
+ if (name.compare("vds.filestor.alldisks.allthreads." \
+ "get.sum.count") == 0) \
+ { \
+ getCount = slime.get()["metrics"]["values"][j]["values"]["count"] \
+ .asDouble(); \
+ } else if (name.compare("vds.filestor.alldisks.allthreads." \
+ "put.sum.count") == 0) \
+ { \
+ putCount = slime.get()["metrics"]["values"][j]["values"]["count"] \
+ .asDouble(); \
+ } \
+ } \
+ CPPUNIT_ASSERT_EQUAL(expGetCount, getCount); \
+ CPPUNIT_ASSERT_EQUAL(expPutCount, putCount); \
+ CPPUNIT_ASSERT(metricCount > 100); \
+}
+
+
+void StateReporterTest::testReportConfigGeneration() {
+ std::ostringstream ost;
+ framework::HttpUrlPath path("/state/v1/config");
+ _stateReporter->reportStatus(ost, path);
+ std::string jsonData = ost.str();
+ //std::cerr << "\nConfig: " << jsonData << "\n";
+ ASSERT_GENERATION(jsonData, "component", 1.0);
+}
+
+void StateReporterTest::testReportHealth() {
+ const int stateCount = 7;
+ const lib::NodeState nodeStates[stateCount] = {
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::UNKNOWN),
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::MAINTENANCE),
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::DOWN),
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::STOPPING),
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::INITIALIZING),
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::RETIRED),
+ lib::NodeState(lib::NodeType::STORAGE, lib::State::UP)
+ };
+ const char* codes[stateCount] = {
+ "down",
+ "down",
+ "down",
+ "down",
+ "down",
+ "down",
+ "up"
+ };
+ const char* messages[stateCount] = {
+ "Node state: Unknown",
+ "Node state: Maintenance",
+ "Node state: Down",
+ "Node state: Stopping",
+ "Node state: Initializing, init progress 0",
+ "Node state: Retired",
+ ""
+ };
+
+ framework::HttpUrlPath path("/state/v1/health");
+ for (int i=0; i<stateCount; i++) {
+ _node->getStateUpdater().setCurrentNodeState(nodeStates[i]);
+ std::ostringstream ost;
+ _stateReporter->reportStatus(ost, path);
+ std::string jsonData = ost.str();
+ //std::cerr << "\nHealth " << i << ":" << jsonData << "\n";
+ ASSERT_NODE_STATUS(jsonData, codes[i], messages[i]);
+ }
+}
+
+void StateReporterTest::testReportMetrics() {
+ FileStorDiskMetrics& disk0(*_filestorMetrics->disks[0]);
+ FileStorThreadMetrics& thread0(*disk0.threads[0]);
+
+ LOG(info, "Adding to get metric");
+
+ using documentapi::LoadType;
+ thread0.get[LoadType::DEFAULT].count.inc(1);
+
+ LOG(info, "Waiting for 5 minute snapshot to be taken");
+ // Wait until active metrics have been added to 5 min snapshot and reset
+ for (uint32_t i=0; i<6; ++i) {
+ _clock->addSecondsToTime(60);
+ _metricManager->timeChangedNotification();
+ while (
+ uint64_t(_metricManager->getLastProcessedTime())
+ < _clock->getTimeInSeconds().getTime())
+ {
+ FastOS_Thread::Sleep(1);
+ }
+ }
+ LOG(info, "5 minute snapshot should have been taken. Adding put count");
+
+ thread0.put[LoadType::DEFAULT].count.inc(1);
+
+ const int pathCount = 2;
+ const char* paths[pathCount] = {
+ "/state/v1/metrics",
+ "/state/v1/metrics?consumer=status"
+ };
+
+ for (int i=0; i<pathCount; i++) {
+ framework::HttpUrlPath path(paths[i]);
+ std::ostringstream ost;
+ _stateReporter->reportStatus(ost, path);
+ std::string jsonData = ost.str();
+ //std::cerr << "\nMetrics:" << jsonData << "\n";
+ ASSERT_METRIC_GET_PUT(jsonData, 1.0, 0.0);
+ }
+ }
+
+} // storage
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.cpp b/storage/src/tests/storageserver/testvisitormessagesession.cpp
new file mode 100644
index 00000000000..e814f6cf229
--- /dev/null
+++ b/storage/src/tests/storageserver/testvisitormessagesession.cpp
@@ -0,0 +1,78 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <tests/storageserver/testvisitormessagesession.h>
+#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
+
+namespace storage {
+
+TestVisitorMessageSession::~TestVisitorMessageSession()
+{
+}
+
+TestVisitorMessageSession::TestVisitorMessageSession(VisitorThread& t,
+ Visitor& v,
+ const mbus::Error& autoReplyError,
+ bool autoReply)
+ : _autoReplyError(autoReplyError),
+ _autoReply(autoReply),
+ thread(t),
+ visitor(v),
+ pendingCount(0)
+{
+}
+
+void
+TestVisitorMessageSession::reply(mbus::Reply::UP rep) {
+ {
+ vespalib::MonitorGuard guard(_waitMonitor);
+ pendingCount--;
+ }
+ thread.handleMessageBusReply(std::move(rep), visitor);
+}
+
+mbus::Result
+TestVisitorMessageSession::send(
+ std::unique_ptr<documentapi::DocumentMessage> message)
+{
+ vespalib::MonitorGuard guard(_waitMonitor);
+ if (_autoReply) {
+ pendingCount++;
+ mbus::Reply::UP rep = message->createReply();
+ rep->setMessage(mbus::Message::UP(message.release()));
+ if (_autoReplyError.getCode() == mbus::ErrorCode::NONE) {
+ reply(std::move(rep));
+ return mbus::Result();
+ } else {
+ return mbus::Result(_autoReplyError,
+ std::unique_ptr<mbus::Message>(message.release()));
+ }
+ } else {
+ pendingCount++;
+ sentMessages.push_back(
+ vespalib::LinkedPtr<documentapi::DocumentMessage>(
+ message.release()));
+ guard.broadcast();
+ return mbus::Result();
+ }
+}
+
+void
+TestVisitorMessageSession::waitForMessages(unsigned int msgCount) {
+ framework::defaultimplementation::RealClock clock;
+ framework::MilliSecTime endTime(
+ clock.getTimeInMillis() + framework::MilliSecTime(60 * 1000));
+
+ vespalib::MonitorGuard guard(_waitMonitor);
+ while (sentMessages.size() < msgCount) {
+ if (clock.getTimeInMillis() > endTime) {
+ throw vespalib::IllegalStateException(
+ vespalib::make_string("Timed out waiting for %u messages "
+ "in test visitor session", msgCount),
+ VESPA_STRLOC);
+ }
+ guard.wait(1000);
+ }
+};
+
+}
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h
new file mode 100644
index 00000000000..3ae6ccafb84
--- /dev/null
+++ b/storage/src/tests/storageserver/testvisitormessagesession.h
@@ -0,0 +1,79 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <deque>
+#include <vespa/storage/visiting/visitormessagesession.h>
+#include <vespa/storage/visiting/visitorthread.h>
+#include <vespa/documentapi/messagebus/messages/documentmessage.h>
+#include <vespa/storage/storageserver/priorityconverter.h>
+
+namespace storage {
+
+class TestVisitorMessageSession : public VisitorMessageSession
+{
+private:
+ vespalib::Monitor _waitMonitor;
+ mbus::Error _autoReplyError;
+ bool _autoReply;
+
+public:
+ typedef std::unique_ptr<TestVisitorMessageSession> UP;
+
+ VisitorThread& thread;
+ Visitor& visitor;
+ uint32_t pendingCount;
+
+ ~TestVisitorMessageSession();
+
+ std::deque<vespalib::LinkedPtr<documentapi::DocumentMessage> > sentMessages;
+
+ TestVisitorMessageSession(VisitorThread& t,
+ Visitor& v,
+ const mbus::Error& autoReplyError,
+ bool autoReply);
+
+ void reply(mbus::Reply::UP rep);
+
+ uint32_t pending() { return pendingCount; }
+
+ mbus::Result send(std::unique_ptr<documentapi::DocumentMessage> message);
+
+ void waitForMessages(unsigned int msgCount);
+
+ vespalib::Monitor& getMonitor() { return _waitMonitor; }
+};
+
+struct TestVisitorMessageSessionFactory : public VisitorMessageSessionFactory
+{
+ vespalib::Lock _accessLock;
+ std::vector<TestVisitorMessageSession*> _visitorSessions;
+ mbus::Error _autoReplyError;
+ bool _createAutoReplyVisitorSessions;
+ PriorityConverter _priConverter;
+
+ TestVisitorMessageSessionFactory(vespalib::stringref configId = "")
+ : _createAutoReplyVisitorSessions(false),
+ _priConverter(configId) {}
+
+ VisitorMessageSession::UP createSession(Visitor& v, VisitorThread& vt) {
+ vespalib::LockGuard lock(_accessLock);
+ TestVisitorMessageSession::UP session(
+ new TestVisitorMessageSession(
+ vt,
+ v,
+ _autoReplyError,
+ _createAutoReplyVisitorSessions));
+ _visitorSessions.push_back(session.get());
+ return VisitorMessageSession::UP(std::move(session));
+ }
+
+ documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const
+ {
+ return _priConverter.toDocumentPriority(storagePriority);
+ }
+
+};
+
+} // storage
+