diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/storageserver |
Publish
Diffstat (limited to 'storage/src/tests/storageserver')
15 files changed, 4696 insertions, 0 deletions
diff --git a/storage/src/tests/storageserver/.gitignore b/storage/src/tests/storageserver/.gitignore new file mode 100644 index 00000000000..c4098089f09 --- /dev/null +++ b/storage/src/tests/storageserver/.gitignore @@ -0,0 +1,13 @@ +*.So +*.lo +*.o +.*.swp +.config.log +.depend +.depend.NEW +.deps +.libs +Makefile +filestorage +testrunner +testrunner.core diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt new file mode 100644 index 00000000000..2e327089b4c --- /dev/null +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storage_teststorageserver + SOURCES + communicationmanagertest.cpp + statemanagertest.cpp + documentapiconvertertest.cpp + mergethrottlertest.cpp + testvisitormessagesession.cpp + bouncertest.cpp + bucketintegritycheckertest.cpp + priorityconvertertest.cpp + statereportertest.cpp + changedbucketownershiphandlertest.cpp + DEPENDS + AFTER + storage_storageconfig +) diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp new file mode 100644 index 00000000000..f00e4b19c31 --- /dev/null +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -0,0 +1,285 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <boost/pointer_cast.hpp> +#include <cppunit/extensions/HelperMacros.h> +#include <iostream> +#include <string> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storageapi/message/stat.h> +#include <vespa/vdslib/state/nodestate.h> +#include <vespa/storage/storageserver/bouncer.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> +#include <tests/common/dummystoragelink.h> +#include <vespa/storageapi/message/persistence.h> + +namespace storage { + +struct BouncerTest : public CppUnit::TestFixture { + std::unique_ptr<TestServiceLayerApp> _node; + std::unique_ptr<DummyStorageLink> _upper; + Bouncer* _manager; + DummyStorageLink* _lower; + + BouncerTest(); + + void setUp(); + void tearDown(); + + void testFutureTimestamp(); + void testAllowNotifyBucketChangeEvenWhenDistributorDown(); + void rejectLowerPrioritizedFeedMessagesWhenConfigured(); + void doNotRejectHigherPrioritizedFeedMessagesThanConfigured(); + void rejectionThresholdIsExclusive(); + void onlyRejectFeedMessagesWhenConfigured(); + void rejectionIsDisabledByDefaultInConfig(); + void readOnlyOperationsAreNotRejected(); + void internalOperationsAreNotRejected(); + void outOfBoundsConfigValuesThrowException(); + + CPPUNIT_TEST_SUITE(BouncerTest); + CPPUNIT_TEST(testFutureTimestamp); + CPPUNIT_TEST(testAllowNotifyBucketChangeEvenWhenDistributorDown); + CPPUNIT_TEST(rejectLowerPrioritizedFeedMessagesWhenConfigured); + CPPUNIT_TEST(doNotRejectHigherPrioritizedFeedMessagesThanConfigured); + CPPUNIT_TEST(rejectionThresholdIsExclusive); + CPPUNIT_TEST(onlyRejectFeedMessagesWhenConfigured); + CPPUNIT_TEST(rejectionIsDisabledByDefaultInConfig); + CPPUNIT_TEST(readOnlyOperationsAreNotRejected); + CPPUNIT_TEST(internalOperationsAreNotRejected); + CPPUNIT_TEST(outOfBoundsConfigValuesThrowException); + CPPUNIT_TEST_SUITE_END(); + + using Priority = api::StorageMessage::Priority; + + static constexpr int RejectionDisabledConfigValue = -1; + + // Note: newThreshold is intentionally int (rather than Priority) in order + // to be able to test out of bounds values. + void configureRejectionThreshold(int newThreshold); + + std::shared_ptr<api::StorageCommand> createDummyFeedMessage( + api::Timestamp timestamp, + Priority priority = 0); + + void assertMessageBouncedWithRejection(); + void assertMessageNotBounced(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(BouncerTest); + +BouncerTest::BouncerTest() + : _node(), + _upper(), + _manager(0), + _lower(0) +{ +} + +void +BouncerTest::setUp() { + try{ + vdstestlib::DirConfig config(getStandardConfig(true)); + _node.reset(new TestServiceLayerApp( + DiskCount(1), NodeIndex(2), config.getConfigId())); + _upper.reset(new DummyStorageLink()); + _manager = new Bouncer(_node->getComponentRegister(), + config.getConfigId()); + _lower = new DummyStorageLink(); + _upper->push_back(std::unique_ptr<StorageLink>(_manager)); + _upper->push_back(std::unique_ptr<StorageLink>(_lower)); + _upper->open(); + } catch (std::exception& e) { + std::cerr << "Failed to static initialize objects: " << e.what() + << "\n"; + } + _node->getClock().setAbsoluteTimeInSeconds(10); +} + +void +BouncerTest::tearDown() { + _manager = 0; + _lower = 0; + _upper->close(); + _upper->flush(); + _upper.reset(0); + _node.reset(0); +} + +std::shared_ptr<api::StorageCommand> +BouncerTest::createDummyFeedMessage(api::Timestamp timestamp, + api::StorageMessage::Priority priority) +{ + auto cmd = std::make_shared<api::RemoveCommand>( + document::BucketId(0), + document::DocumentId("doc:foo:bar"), + timestamp); + cmd->setPriority(priority); + return cmd; +} + +void +BouncerTest::testFutureTimestamp() +{ + + // Fail when future timestamps (more than 5 seconds) are received. + { + _upper->sendDown(createDummyFeedMessage(16 * 1000000)); + + CPPUNIT_ASSERT_EQUAL(1, (int)_upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(0, (int)_upper->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + static_cast<api::RemoveReply&>(*_upper->getReply(0)). + getResult().getResult()); + _upper->reset(); + } + + // Verify that 1 second clock skew is OK + { + _upper->sendDown(createDummyFeedMessage(11 * 1000000)); + + CPPUNIT_ASSERT_EQUAL(0, (int)_upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(1, (int)_lower->getNumCommands()); + _lower->reset(); + } + + // Verify that past is OK + { + _upper->sendDown(createDummyFeedMessage(5 * 1000000)); + + CPPUNIT_ASSERT_EQUAL(1, (int)_lower->getNumCommands()); + } + + +} + +void +BouncerTest::testAllowNotifyBucketChangeEvenWhenDistributorDown() +{ + lib::NodeState state(lib::NodeType::DISTRIBUTOR, lib::State::DOWN); + _node->getNodeStateUpdater().setReportedNodeState(state); + // Trigger Bouncer state update + auto clusterState = std::make_shared<lib::ClusterState>( + "distributor:3 storage:3"); + _node->getNodeStateUpdater().setClusterState(clusterState); + + + document::BucketId bucket(16, 1234); + api::BucketInfo info(0x1, 0x2, 0x3); + auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, info); + _upper->sendDown(cmd); + + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _lower->getNumCommands()); +} + +void +BouncerTest::assertMessageBouncedWithRejection() +{ + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::REJECTED, + static_cast<api::RemoveReply&>(*_upper->getReply(0)). + getResult().getResult()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumCommands()); +} + +void +BouncerTest::assertMessageNotBounced() +{ + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _lower->getNumCommands()); +} + +void +BouncerTest::configureRejectionThreshold(int newThreshold) +{ + using Builder = vespa::config::content::core::StorBouncerConfigBuilder; + auto config = std::make_unique<Builder>(); + config->feedRejectionPriorityThreshold = newThreshold; + _manager->configure(std::move(config)); +} + +void +BouncerTest::rejectLowerPrioritizedFeedMessagesWhenConfigured() +{ + configureRejectionThreshold(Priority(120)); + _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(121))); + assertMessageBouncedWithRejection(); +} + +void +BouncerTest::doNotRejectHigherPrioritizedFeedMessagesThanConfigured() +{ + configureRejectionThreshold(Priority(120)); + _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(119))); + assertMessageNotBounced(); +} + +void +BouncerTest::rejectionThresholdIsExclusive() +{ + configureRejectionThreshold(Priority(120)); + _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(120))); + assertMessageNotBounced(); +} + +void +BouncerTest::onlyRejectFeedMessagesWhenConfigured() +{ + configureRejectionThreshold(RejectionDisabledConfigValue); + // A message with even the lowest priority should not be rejected. + _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(255))); + assertMessageNotBounced(); +} + +void +BouncerTest::rejectionIsDisabledByDefaultInConfig() +{ + _upper->sendDown(createDummyFeedMessage(11 * 1000000, Priority(255))); + assertMessageNotBounced(); +} + +void +BouncerTest::readOnlyOperationsAreNotRejected() +{ + configureRejectionThreshold(Priority(1)); + // StatBucket is an external operation, but it's not a mutating operation + // and should therefore not be blocked. + auto cmd = std::make_shared<api::StatBucketCommand>( + document::BucketId(16, 5), ""); + cmd->setPriority(Priority(2)); + _upper->sendDown(cmd); + assertMessageNotBounced(); +} + +void +BouncerTest::internalOperationsAreNotRejected() +{ + configureRejectionThreshold(Priority(1)); + document::BucketId bucket(16, 1234); + api::BucketInfo info(0x1, 0x2, 0x3); + auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, info); + cmd->setPriority(Priority(2)); + _upper->sendDown(cmd); + assertMessageNotBounced(); +} + +void +BouncerTest::outOfBoundsConfigValuesThrowException() +{ + try { + configureRejectionThreshold(256); + CPPUNIT_FAIL("Upper bound violation not caught"); + } catch (config::InvalidConfigException) {} + + try { + configureRejectionThreshold(-2); + CPPUNIT_FAIL("Lower bound violation not caught"); + } catch (config::InvalidConfigException) {} +} + +} // storage + diff --git a/storage/src/tests/storageserver/bucketintegritycheckertest.cpp b/storage/src/tests/storageserver/bucketintegritycheckertest.cpp new file mode 100644 index 00000000000..88a5546b174 --- /dev/null +++ b/storage/src/tests/storageserver/bucketintegritycheckertest.cpp @@ -0,0 +1,302 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <boost/lexical_cast.hpp> +#include <cppunit/extensions/HelperMacros.h> +#include <vespa/log/log.h> +#include <vespa/storage/bucketdb/bucketmanager.h> +#include <vespa/storage/bucketdb/storbucketdb.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/storageserver/bucketintegritychecker.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/message/persistence.h> +#include <tests/common/testhelper.h> +#include <tests/common/storagelinktest.h> +#include <tests/common/dummystoragelink.h> +#include <vespa/vespalib/io/fileutil.h> +#include <tests/common/teststorageapp.h> + +LOG_SETUP(".test.bucketintegritychecker"); + +namespace storage { + +struct BucketIntegrityCheckerTest : public CppUnit::TestFixture { + std::unique_ptr<vdstestlib::DirConfig> _config; + std::unique_ptr<TestServiceLayerApp> _node; + int _timeout; // Timeout in seconds before aborting + + void setUp() { + _timeout = 60*2; + _config.reset(new vdstestlib::DirConfig(getStandardConfig(true))); + _node.reset(new TestServiceLayerApp(DiskCount(256), + NodeIndex(0), + _config->getConfigId())); + } + + void tearDown() { + LOG(info, "Finished test"); + } + + void testConfig(); + void testBasicFunctionality(); + void testTiming(); + + CPPUNIT_TEST_SUITE(BucketIntegrityCheckerTest); + CPPUNIT_TEST(testConfig); + CPPUNIT_TEST(testBasicFunctionality); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(BucketIntegrityCheckerTest); + +void BucketIntegrityCheckerTest::testConfig() +{ + + // Verify that config is read correctly. Given config should not use + // any default values. + vdstestlib::DirConfig::Config& config( + _config->getConfig("stor-integritychecker")); + config.set("dailycyclestart", "60"); + config.set("dailycyclestop", "360"); + config.set("weeklycycle", "crRc-rc"); + config.set("maxpending", "2"); + config.set("mincycletime", "120"); + config.set("requestdelay", "5"); + + BucketIntegrityChecker checker(_config->getConfigId(), + _node->getComponentRegister()); + checker.setMaxThreadWaitTime(framework::MilliSecTime(10)); + SchedulingOptions& opt(checker.getSchedulingOptions()); + CPPUNIT_ASSERT_EQUAL(60u, opt._dailyCycleStart); + CPPUNIT_ASSERT_EQUAL(360u, opt._dailyCycleStop); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::CONTINUE, opt._dailyStates[0]); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::RUN_CHEAP, opt._dailyStates[1]); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::RUN_FULL, opt._dailyStates[2]); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::CONTINUE, opt._dailyStates[3]); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::DONT_RUN, opt._dailyStates[4]); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::RUN_CHEAP, opt._dailyStates[5]); + CPPUNIT_ASSERT_EQUAL(SchedulingOptions::CONTINUE, opt._dailyStates[6]); + CPPUNIT_ASSERT_EQUAL(2u, opt._maxPendingCount); + CPPUNIT_ASSERT_EQUAL(framework::SecondTime(7200), opt._minCycleTime); + CPPUNIT_ASSERT_EQUAL(framework::SecondTime(5), opt._requestDelay); +} + +namespace { + /** + * Calculate a date based on the following format: + * week<#> <day> <hh>:<mm>:<ss> + * Examples: "week3 mon 00:30:00" + * "week3 tue 04:20:00" + * "week9 thi 14:00:24" + */ + time_t getDate(const std::string& datestring) { + vespalib::string rest(datestring); + int spacePos = rest.find(' '); + uint32_t week = strtoul(rest.substr(4, spacePos-4).c_str(), NULL, 0); + rest = rest.substr(spacePos+1); + vespalib::string wday(rest.substr(0,3)); + rest = rest.substr(4); + uint32_t hours = strtoul(rest.substr(0, 2).c_str(), NULL, 0); + uint32_t minutes = strtoul(rest.substr(3, 2).c_str(), NULL, 0); + uint32_t seconds = strtoul(rest.substr(6, 2).c_str(), NULL, 0); + uint32_t day(0); + if (wday == "mon") { day = 1; } + else if (wday == "tue") { day = 2; } + else if (wday == "wed") { day = 3; } + else if (wday == "thi") { day = 4; } + else if (wday == "fri") { day = 5; } + else if (wday == "sat") { day = 6; } + else if (wday == "sun") { day = 0; } + else { assert(false); } + // Create a start time that points to the start of some week. + // A random sunday 00:00:00, which we will use as start of time + struct tm mytime; + memset(&mytime, 0, sizeof(mytime)); + mytime.tm_year = 2008 - 1900; + mytime.tm_mon = 0; + mytime.tm_mday = 1; + mytime.tm_hour = 0; + mytime.tm_min = 0; + mytime.tm_sec = 0; + time_t startTime = timegm(&mytime); + CPPUNIT_ASSERT(gmtime_r(&startTime, &mytime)); + while (mytime.tm_wday != 0) { + ++mytime.tm_mday; + startTime = timegm(&mytime); + CPPUNIT_ASSERT(gmtime_r(&startTime, &mytime)); + } + // Add the wanted values to the start time + time_t resultTime = startTime; + resultTime += week * 7 * 24 * 60 * 60 + + day * 24 * 60 * 60 + + hours * 60 * 60 + + minutes * 60 + + seconds; + // std::cerr << "Time requested " << datestring << ". Got time " + // << framework::SecondTime(resultTime).toString() << "\n"; + return resultTime; + } + + void addBucketToDatabase(TestServiceLayerApp& server, + const document::BucketId& id, uint8_t disk, + uint32_t numDocs, uint32_t crc, uint32_t totalSize) + { + bucketdb::StorageBucketInfo info; + info.setBucketInfo(api::BucketInfo(crc, numDocs, totalSize)); + info.disk = disk; + server.getStorageBucketDatabase().insert(id, info, "foo"); + } + + + /** + * In tests wanting to only have one pending, only add buckets for one disk + * as pending is per disk. If so set singleDisk true. + */ + void addBucketsToDatabase(TestServiceLayerApp& server, bool singleDisk) { + addBucketToDatabase(server, document::BucketId(16, 0x123), 0, + 14, 0x123, 1024); + addBucketToDatabase(server, document::BucketId(16, 0x234), 0, + 18, 0x234, 1024); + addBucketToDatabase(server, document::BucketId(16, 0x345), 0, + 11, 0x345, 2048); + addBucketToDatabase(server, document::BucketId(16, 0x456), 0, + 13, 0x456, 1280); + if (!singleDisk) { + addBucketToDatabase(server, document::BucketId(16, 0x567), 1, + 20, 0x567, 4096); + addBucketToDatabase(server, document::BucketId(16, 0x987), 254, + 8, 0x987, 65536); + } + } +} + +void BucketIntegrityCheckerTest::testBasicFunctionality() +{ + _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 00:00:00")); + addBucketsToDatabase(*_node, false); + DummyStorageLink* dummyLink = 0; + { + std::unique_ptr<BucketIntegrityChecker> midLink( + new BucketIntegrityChecker("", _node->getComponentRegister())); + BucketIntegrityChecker& checker(*midLink); + checker.setMaxThreadWaitTime(framework::MilliSecTime(10)); + // Setup and start checker + DummyStorageLink topLink; + topLink.push_back(StorageLink::UP(midLink.release())); + checker.push_back(std::unique_ptr<StorageLink>( + dummyLink = new DummyStorageLink())); + checker.getSchedulingOptions()._maxPendingCount = 2; + checker.getSchedulingOptions()._minCycleTime = framework::SecondTime(60 * 60); + topLink.open(); + // Waiting for system to be initialized + FastOS_Thread::Sleep(10); // Give next message chance to come + ASSERT_COMMAND_COUNT(0, *dummyLink); + topLink.doneInit(); + checker.bump(); + // Should have started new run with 2 pending per disk + dummyLink->waitForMessages(4, _timeout); + FastOS_Thread::Sleep(10); // Give 5th message chance to come + ASSERT_COMMAND_COUNT(4, *dummyLink); + RepairBucketCommand *cmd1 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(0).get()); + CPPUNIT_ASSERT_EQUAL(230, (int)cmd1->getPriority()); + CPPUNIT_ASSERT(cmd1); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x234), + cmd1->getBucketId()); + RepairBucketCommand *cmd2 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(1).get()); + CPPUNIT_ASSERT(cmd2); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x456), + cmd2->getBucketId()); + RepairBucketCommand *cmd3 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(2).get()); + CPPUNIT_ASSERT(cmd3); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x567), + cmd3->getBucketId()); + RepairBucketCommand *cmd4 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(3).get()); + CPPUNIT_ASSERT(cmd4); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x987), + cmd4->getBucketId()); + + // Answering a message on disk with no more buckets does not trigger new + std::shared_ptr<RepairBucketReply> reply1( + new RepairBucketReply(*cmd3)); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply1)); + FastOS_Thread::Sleep(10); // Give next message chance to come + ASSERT_COMMAND_COUNT(4, *dummyLink); + // Answering a message on disk with more buckets trigger new repair + std::shared_ptr<RepairBucketReply> reply2( + new RepairBucketReply(*cmd2)); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply2)); + dummyLink->waitForMessages(5, _timeout); + FastOS_Thread::Sleep(10); // Give 6th message chance to come + ASSERT_COMMAND_COUNT(5, *dummyLink); + RepairBucketCommand *cmd5 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(4).get()); + CPPUNIT_ASSERT(cmd5); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x345), + cmd5->getBucketId()); + // Fail a repair, causing it to be resent later, but first continue + // with other bucket. + std::shared_ptr<RepairBucketReply> reply3( + new RepairBucketReply(*cmd1)); + reply3->setResult(api::ReturnCode(api::ReturnCode::IGNORED)); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply3)); + dummyLink->waitForMessages(6, _timeout); + FastOS_Thread::Sleep(10); // Give 7th message chance to come + ASSERT_COMMAND_COUNT(6, *dummyLink); + RepairBucketCommand *cmd6 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(5).get()); + CPPUNIT_ASSERT(cmd6); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x123), + cmd6->getBucketId()); + // Fail a repair with not found. That is an acceptable return code. + // (No more requests as this was last for that disk) + std::shared_ptr<RepairBucketReply> reply4( + new RepairBucketReply(*cmd4)); + reply3->setResult(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND)); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply4)); + FastOS_Thread::Sleep(10); // Give 7th message chance to come + ASSERT_COMMAND_COUNT(6, *dummyLink); + + // Send a repair reply that actually have corrected the bucket. + api::BucketInfo newInfo(0x3456, 4, 8192); + std::shared_ptr<RepairBucketReply> reply5( + new RepairBucketReply(*cmd5, newInfo)); + reply5->setAltered(true); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply5)); + + // Finish run. New iteration should not start yet as min + // cycle time has not passed + std::shared_ptr<RepairBucketReply> reply6( + new RepairBucketReply(*cmd6)); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply6)); + dummyLink->waitForMessages(7, _timeout); + ASSERT_COMMAND_COUNT(7, *dummyLink); + RepairBucketCommand *cmd7 = dynamic_cast<RepairBucketCommand*>( + dummyLink->getCommand(6).get()); + CPPUNIT_ASSERT(cmd7); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x234), + cmd7->getBucketId()); + std::shared_ptr<RepairBucketReply> reply7( + new RepairBucketReply(*cmd7)); + CPPUNIT_ASSERT(StorageLinkTest::callOnUp(checker, reply7)); + FastOS_Thread::Sleep(10); // Give 8th message chance to come + ASSERT_COMMAND_COUNT(7, *dummyLink); + + // Still not time for next iteration + dummyLink->reset(); + _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 00:59:59")); + FastOS_Thread::Sleep(10); // Give new run chance to start + ASSERT_COMMAND_COUNT(0, *dummyLink); + + // Pass time until next cycle should start + dummyLink->reset(); + _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 01:00:00")); + dummyLink->waitForMessages(4, _timeout); + ASSERT_COMMAND_COUNT(4, *dummyLink); + } +} + +} // storage diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp new file mode 100644 index 00000000000..3b83d71d8f3 --- /dev/null +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -0,0 +1,648 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/document/base/testdocman.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/storage/bucketdb/storbucketdb.h> +#include <vespa/storage/persistence/messages.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/multioperation.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> +#include <tests/common/dummystoragelink.h> +#include <vespa/storage/storageserver/changedbucketownershiphandler.h> +#include <memory> + +namespace storage { + +class ChangedBucketOwnershipHandlerTest : public CppUnit::TestFixture +{ + std::unique_ptr<TestServiceLayerApp> _app; + std::unique_ptr<DummyStorageLink> _top; + ChangedBucketOwnershipHandler* _handler; + DummyStorageLink* _bottom; + document::TestDocMan _testDocRepo; + + CPPUNIT_TEST_SUITE(ChangedBucketOwnershipHandlerTest); + CPPUNIT_TEST(testEnumerateBucketsBelongingOnChangedNodes); + CPPUNIT_TEST(testNoPreExistingClusterState); + CPPUNIT_TEST(testNoAvailableDistributorsInCurrentState); + CPPUNIT_TEST(testNoAvailableDistributorsInCurrentAndNewState); + CPPUNIT_TEST(testDownEdgeToNoAvailableDistributors); + CPPUNIT_TEST(testOwnershipChangedOnDistributorUpEdge); + CPPUNIT_TEST(testDistributionConfigChangeUpdatesOwnership); + CPPUNIT_TEST(testAbortOpsWhenNoClusterStateSet); + CPPUNIT_TEST(testAbortOutdatedSplit); + CPPUNIT_TEST(testAbortOutdatedJoin); + CPPUNIT_TEST(testAbortOutdatedSetBucketState); + CPPUNIT_TEST(testAbortOutdatedCreateBucket); + CPPUNIT_TEST(testAbortOutdatedDeleteBucket); + CPPUNIT_TEST(testAbortOutdatedMergeBucket); + CPPUNIT_TEST(testAbortOutdatedRemoveLocation); + CPPUNIT_TEST(testIdealStateAbortsAreConfigurable); + CPPUNIT_TEST(testAbortOutdatedPutOperation); + CPPUNIT_TEST(testAbortOutdatedMultiOperation); + CPPUNIT_TEST(testAbortOutdatedUpdateCommand); + CPPUNIT_TEST(testAbortOutdatedRemoveCommand); + CPPUNIT_TEST(testAbortOutdatedRevertCommand); + CPPUNIT_TEST(testIdealStateAbortUpdatesMetric); + CPPUNIT_TEST(testExternalLoadOpAbortUpdatesMetric); + CPPUNIT_TEST(testExternalLoadOpAbortsAreConfigurable); + CPPUNIT_TEST_SUITE_END(); + + // TODO test: down edge triggered on cluster state with cluster down? + + std::vector<document::BucketId> insertBuckets( + uint32_t numBuckets, + uint16_t wantedOwner, + const lib::ClusterState& state); + + std::shared_ptr<api::SetSystemStateCommand> createStateCmd( + const lib::ClusterState& state) const + { + return std::make_shared<api::SetSystemStateCommand>(state); + } + + std::shared_ptr<api::SetSystemStateCommand> createStateCmd( + const std::string& stateStr) const + { + return createStateCmd(lib::ClusterState(stateStr)); + } + + void applyDistribution(Redundancy, NodeCount); + void applyClusterState(const lib::ClusterState&); + + document::BucketId nextOwnedBucket( + uint16_t wantedOwner, + const lib::ClusterState& state, + const document::BucketId& lastId) const; + + document::BucketId getBucketToAbort() const; + document::BucketId getBucketToAllow() const; + + void sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex); + + template <typename MsgType, typename... MsgParams> + bool changeAbortsMessage(MsgParams&&... params); + + lib::ClusterState getDefaultTestClusterState() const { + return lib::ClusterState("distributor:4 storage:1"); + } + +public: + void testEnumerateBucketsBelongingOnChangedNodes(); + void testNoPreExistingClusterState(); + void testNoAvailableDistributorsInCurrentState(); + void testNoAvailableDistributorsInCurrentAndNewState(); + void testDownEdgeToNoAvailableDistributors(); + void testOwnershipChangedOnDistributorUpEdge(); + void testDistributionConfigChangeUpdatesOwnership(); + void testAbortOpsWhenNoClusterStateSet(); + void testAbortOutdatedSplit(); + void testAbortOutdatedJoin(); + void testAbortOutdatedSetBucketState(); + void testAbortOutdatedCreateBucket(); + void testAbortOutdatedDeleteBucket(); + void testAbortOutdatedMergeBucket(); + void testAbortOutdatedRemoveLocation(); + void testIdealStateAbortsAreConfigurable(); + void testAbortOutdatedPutOperation(); + void testAbortOutdatedMultiOperation(); + void testAbortOutdatedUpdateCommand(); + void testAbortOutdatedRemoveCommand(); + void testAbortOutdatedRevertCommand(); + void testIdealStateAbortUpdatesMetric(); + void testExternalLoadOpAbortUpdatesMetric(); + void testExternalLoadOpAbortsAreConfigurable(); + + void setUp(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(ChangedBucketOwnershipHandlerTest); + +document::BucketId +ChangedBucketOwnershipHandlerTest::nextOwnedBucket( + uint16_t wantedOwner, + const lib::ClusterState& state, + const document::BucketId& lastId) const +{ + uint32_t idx(lastId.getId() + 1); + while (true) { + document::BucketId candidate(16, idx); + uint16_t owner(_app->getDistribution()->getIdealDistributorNode( + state, candidate)); + if (owner == wantedOwner) { + return candidate; + } + ++idx; + } + assert(!"should never get here"); +} + +std::vector<document::BucketId> +ChangedBucketOwnershipHandlerTest::insertBuckets(uint32_t numBuckets, + uint16_t wantedOwner, + const lib::ClusterState& state) +{ + std::vector<document::BucketId> inserted; + document::BucketId bucket; + while (inserted.size() < numBuckets) { + bucket = nextOwnedBucket(wantedOwner, state, bucket); + + bucketdb::StorageBucketInfo sbi; + sbi.setBucketInfo(api::BucketInfo(1, 2, 3)); + sbi.disk = 0; + _app->getStorageBucketDatabase().insert(bucket, sbi, "test"); + inserted.push_back(bucket); + } + return inserted; +} + +void +ChangedBucketOwnershipHandlerTest::setUp() +{ + vdstestlib::DirConfig config(getStandardConfig(true)); + + _app.reset(new TestServiceLayerApp); + _top.reset(new DummyStorageLink); + _handler = new ChangedBucketOwnershipHandler(config.getConfigId(), + _app->getComponentRegister()); + _top->push_back(std::unique_ptr<StorageLink>(_handler)); + _bottom = new DummyStorageLink; + _handler->push_back(std::unique_ptr<StorageLink>(_bottom)); + _top->open(); + + // Ensure we're not dependent on config schema default values. + std::unique_ptr<vespa::config::content::PersistenceConfigBuilder> pconfig( + new vespa::config::content::PersistenceConfigBuilder); + pconfig->abortOutdatedMutatingIdealStateOps = true; + pconfig->abortOutdatedMutatingExternalLoadOps = true; + _handler->configure(std::move(pconfig)); +} + +namespace { + +template <typename Set, typename K> +bool has(const Set& s, const K& key) { + return s.find(key) != s.end(); +} + +template <typename Vec> +bool +hasAbortedAllOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v) +{ + for (auto& b : v) { + if (!cmd->shouldAbort(b)) { + return false; + } + } + return true; +} + +template <typename Vec> +bool +hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v) +{ + for (auto& b : v) { + if (cmd->shouldAbort(b)) { + return false; + } + } + return true; +} + +bool +hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) { + if (link.getNumCommands() != 1) { + std::cerr << "expected 1 command, found" + << link.getNumCommands() << "\n"; + } + api::SetSystemStateCommand::SP cmd( + std::dynamic_pointer_cast<api::SetSystemStateCommand>( + link.getCommand(0))); + return (cmd.get() != 0); +} + +} + +void +ChangedBucketOwnershipHandlerTest::applyDistribution( + Redundancy redundancy, NodeCount nodeCount) +{ + _app->setDistribution(redundancy, nodeCount); + _handler->storageDistributionChanged(); +} + +void +ChangedBucketOwnershipHandlerTest::applyClusterState( + const lib::ClusterState& state) +{ + _app->setClusterState(state); + _handler->reloadClusterState(); +} + +void +ChangedBucketOwnershipHandlerTest::testEnumerateBucketsBelongingOnChangedNodes() +{ + lib::ClusterState stateBefore("distributor:4 storage:1"); + applyDistribution(Redundancy(1), NodeCount(4)); + applyClusterState(stateBefore); + auto node1Buckets(insertBuckets(2, 1, stateBefore)); + auto node3Buckets(insertBuckets(2, 3, stateBefore)); + // Add some buckets that will not be part of the change set + auto node0Buckets(insertBuckets(3, 0, stateBefore)); + auto node2Buckets(insertBuckets(2, 2, stateBefore)); + + _top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1")); + // TODO: refactor into own function + CPPUNIT_ASSERT_EQUAL(size_t(2), _bottom->getNumCommands()); + AbortBucketOperationsCommand::SP cmd( + std::dynamic_pointer_cast<AbortBucketOperationsCommand>( + _bottom->getCommand(0))); + CPPUNIT_ASSERT(cmd.get() != 0); + + CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node1Buckets)); + CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node3Buckets)); + CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node0Buckets)); + CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node2Buckets)); + + // Handler must swallow abort replies + _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release())); + CPPUNIT_ASSERT_EQUAL(size_t(0), _top->getNumReplies()); +} + +void +ChangedBucketOwnershipHandlerTest::testNoPreExistingClusterState() +{ + applyDistribution(Redundancy(1), NodeCount(4)); + lib::ClusterState stateBefore("distributor:4 storage:1"); + insertBuckets(2, 1, stateBefore); + insertBuckets(3, 0, stateBefore); + insertBuckets(2, 2, stateBefore); + + _top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1")); + CPPUNIT_ASSERT(hasOnlySetSystemStateCmdQueued(*_bottom)); +} + +/** + * When current state has no distributors and we receive a state with one or + * more distributors, we do not send any abort messages since this should + * already have been done on the down-edge. + */ +void +ChangedBucketOwnershipHandlerTest::testNoAvailableDistributorsInCurrentState() +{ + applyDistribution(Redundancy(1), NodeCount(3)); + lib::ClusterState insertedState("distributor:3 storage:1"); + insertBuckets(2, 0, insertedState); + insertBuckets(2, 1, insertedState); + insertBuckets(2, 2, insertedState); + lib::ClusterState downState("distributor:3 .0.s:d .1.s:d .2.s:d storage:1"); + _app->setClusterState(downState); + + _top->sendDown(createStateCmd("distributor:3 .1.s:d storage:1")); + CPPUNIT_ASSERT(hasOnlySetSystemStateCmdQueued(*_bottom)); +} + +void +ChangedBucketOwnershipHandlerTest::testNoAvailableDistributorsInCurrentAndNewState() +{ + applyDistribution(Redundancy(1), NodeCount(3)); + lib::ClusterState insertedState("distributor:3 storage:1"); + insertBuckets(2, 0, insertedState); + insertBuckets(2, 1, insertedState); + insertBuckets(2, 2, insertedState); + lib::ClusterState stateBefore("distributor:3 .0.s:s .1.s:s .2.s:d storage:1"); + applyClusterState(stateBefore); + lib::ClusterState downState("distributor:3 .0.s:d .1.s:d .2.s:d storage:1"); + + _top->sendDown(createStateCmd(downState)); + CPPUNIT_ASSERT(hasOnlySetSystemStateCmdQueued(*_bottom)); +} + +void +ChangedBucketOwnershipHandlerTest::testDownEdgeToNoAvailableDistributors() +{ + lib::ClusterState insertedState("distributor:3 storage:1"); + applyDistribution(Redundancy(1), NodeCount(3)); + applyClusterState(insertedState); + auto node0Buckets(insertBuckets(2, 0, insertedState)); + auto node1Buckets(insertBuckets(2, 1, insertedState)); + auto node2Buckets(insertBuckets(2, 2, insertedState)); + lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1"); + + _top->sendDown(createStateCmd(downState)); + // TODO: refactor into own function + CPPUNIT_ASSERT_EQUAL(size_t(2), _bottom->getNumCommands()); + AbortBucketOperationsCommand::SP cmd( + std::dynamic_pointer_cast<AbortBucketOperationsCommand>( + _bottom->getCommand(0))); + CPPUNIT_ASSERT(cmd.get() != 0); + + CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node0Buckets)); + CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node1Buckets)); + CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node2Buckets)); +} + +void +ChangedBucketOwnershipHandlerTest::testOwnershipChangedOnDistributorUpEdge() +{ + lib::ClusterState stateBefore( + "version:10 distributor:4 .1.s:d storage:4 .1.s:d"); + lib::ClusterState stateAfter( + "version:11 distributor:4 .1.t:1369990247 storage:4 .1.s:d"); + applyDistribution(Redundancy(1), NodeCount(4)); + applyClusterState(stateBefore); + // Add buckets that will belong to distributor 1 after it has come back up + auto node1Buckets(insertBuckets(2, 1, stateAfter)); + // Add some buckets that will not be part of the change set + auto node0Buckets(insertBuckets(3, 0, stateAfter)); + auto node2Buckets(insertBuckets(2, 2, stateAfter)); + + _top->sendDown(createStateCmd(stateAfter)); + // TODO: refactor into own function + CPPUNIT_ASSERT_EQUAL(size_t(2), _bottom->getNumCommands()); + AbortBucketOperationsCommand::SP cmd( + std::dynamic_pointer_cast<AbortBucketOperationsCommand>( + _bottom->getCommand(0))); + CPPUNIT_ASSERT(cmd.get() != 0); + + CPPUNIT_ASSERT(hasAbortedAllOf(cmd, node1Buckets)); + CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node0Buckets)); + CPPUNIT_ASSERT(hasAbortedNoneOf(cmd, node2Buckets)); + + // Handler must swallow abort replies + _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release())); + CPPUNIT_ASSERT_EQUAL(size_t(0), _top->getNumReplies()); +} + +void +ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket( + uint16_t fromDistributorIndex) +{ + document::BucketId bucket(16, 6786); + auto msg = std::make_shared<api::CreateBucketCommand>(bucket); + msg->setSourceIndex(fromDistributorIndex); + + _top->sendDown(msg); + std::vector<api::StorageMessage::SP> replies(_top->getRepliesOnce()); + CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size()); + api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*replies[0])); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + reply.getResult().getResult()); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOpsWhenNoClusterStateSet() +{ + sendAndExpectAbortedCreateBucket(1); +} + +void +ChangedBucketOwnershipHandlerTest::testDistributionConfigChangeUpdatesOwnership() +{ + lib::ClusterState insertedState("distributor:3 storage:1"); + applyClusterState(insertedState); + applyDistribution(Redundancy(1), NodeCount(3)); + + // Apply new distribution config containing only 1 distributor, meaning + // any messages sent from >1 must be aborted. + applyDistribution(Redundancy(1), NodeCount(1)); + sendAndExpectAbortedCreateBucket(2); +} + +/** + * Generate and dispatch a message of the given type with the provided + * aruments as if that message was sent from distributor 1. Messages will + * be checked as if the state contains 4 distributors in Up state. This + * means that it suffices to send in a message with a bucket that is not + * owned by distributor 1 in this state to trigger an abort. + */ +template <typename MsgType, typename... MsgParams> +bool +ChangedBucketOwnershipHandlerTest::changeAbortsMessage(MsgParams&&... params) +{ + auto msg = std::make_shared<MsgType>(std::forward<MsgParams>(params)...); + msg->setSourceIndex(1); + + applyDistribution(Redundancy(1), NodeCount(4)); + applyClusterState(getDefaultTestClusterState()); + + _top->sendDown(msg); + std::vector<api::StorageMessage::SP> replies(_top->getRepliesOnce()); + // Test is single-threaded, no need to do any waiting. + if (replies.empty()) { + return false; + } else { + CPPUNIT_ASSERT_EQUAL(size_t(1), replies.size()); + // Make sure the message was actually aborted and not bounced with + // some other arbitrary failure code. + api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*replies[0])); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED, + reply.getResult().getResult()); + return true; + } +} + +/** + * Returns a bucket that is not owned by the sending distributor (1). More + * specifically, it returns a bucket that is owned by distributor 2. + */ +document::BucketId +ChangedBucketOwnershipHandlerTest::getBucketToAbort() const +{ + lib::ClusterState state(getDefaultTestClusterState()); + return nextOwnedBucket(2, state, document::BucketId()); +} + +/** + * Returns a bucket that _is_ owned by distributor 1 and should thus be + * allowed through. + */ +document::BucketId +ChangedBucketOwnershipHandlerTest::getBucketToAllow() const +{ + lib::ClusterState state(getDefaultTestClusterState()); + return nextOwnedBucket(1, state, document::BucketId()); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedSplit() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::SplitBucketCommand>( + getBucketToAbort())); + CPPUNIT_ASSERT(!changeAbortsMessage<api::SplitBucketCommand>( + getBucketToAllow())); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedJoin() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::JoinBucketsCommand>( + getBucketToAbort())); + CPPUNIT_ASSERT(!changeAbortsMessage<api::JoinBucketsCommand>( + getBucketToAllow())); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedSetBucketState() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::SetBucketStateCommand>( + getBucketToAbort(), api::SetBucketStateCommand::ACTIVE)); + CPPUNIT_ASSERT(!changeAbortsMessage<api::SetBucketStateCommand>( + getBucketToAllow(), api::SetBucketStateCommand::ACTIVE)); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedCreateBucket() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::CreateBucketCommand>( + getBucketToAbort())); + CPPUNIT_ASSERT(!changeAbortsMessage<api::CreateBucketCommand>( + getBucketToAllow())); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedDeleteBucket() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::DeleteBucketCommand>( + getBucketToAbort())); + CPPUNIT_ASSERT(!changeAbortsMessage<api::DeleteBucketCommand>( + getBucketToAllow())); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedMergeBucket() +{ + std::vector<api::MergeBucketCommand::Node> nodes; + CPPUNIT_ASSERT(changeAbortsMessage<api::MergeBucketCommand>( + getBucketToAbort(), nodes, 0)); + CPPUNIT_ASSERT(!changeAbortsMessage<api::MergeBucketCommand>( + getBucketToAllow(), nodes, 0)); +} + +/** + * RemoveLocation is technically an external load class, but since it's also + * used as the backing operation for GC we have to treat it as if it were an + * ideal state operation class. + */ +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedRemoveLocation() +{ + std::vector<api::MergeBucketCommand::Node> nodes; + CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveLocationCommand>( + "foo", getBucketToAbort())); + CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveLocationCommand>( + "foo", getBucketToAllow())); +} + +void +ChangedBucketOwnershipHandlerTest::testIdealStateAbortsAreConfigurable() +{ + std::unique_ptr<vespa::config::content::PersistenceConfigBuilder> config( + new vespa::config::content::PersistenceConfigBuilder); + config->abortOutdatedMutatingIdealStateOps = false; + _handler->configure(std::move(config)); + // Should not abort operation, even when ownership has changed. + CPPUNIT_ASSERT(!changeAbortsMessage<api::CreateBucketCommand>( + getBucketToAbort())); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedPutOperation() +{ + document::Document::SP doc(_testDocRepo.createRandomDocumentAtLocation(1)); + CPPUNIT_ASSERT(changeAbortsMessage<api::PutCommand>( + getBucketToAbort(), doc, api::Timestamp(1234))); + CPPUNIT_ASSERT(!changeAbortsMessage<api::PutCommand>( + getBucketToAllow(), doc, api::Timestamp(1234))); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedMultiOperation() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::MultiOperationCommand>( + _testDocRepo.getTypeRepoSP(), getBucketToAbort(), 1024)); + CPPUNIT_ASSERT(!changeAbortsMessage<api::MultiOperationCommand>( + _testDocRepo.getTypeRepoSP(), getBucketToAllow(), 1024)); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedUpdateCommand() +{ + const document::DocumentType* docType(_testDocRepo.getTypeRepo() + .getDocumentType("testdoctype1")); + document::DocumentId docId("id:foo:testdoctype1::bar"); + document::DocumentUpdate::SP update( + std::make_shared<document::DocumentUpdate>(*docType, docId)); + CPPUNIT_ASSERT(changeAbortsMessage<api::UpdateCommand>( + getBucketToAbort(), update, api::Timestamp(1234))); + CPPUNIT_ASSERT(!changeAbortsMessage<api::UpdateCommand>( + getBucketToAllow(), update, api::Timestamp(1234))); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedRemoveCommand() +{ + document::DocumentId docId("id:foo:testdoctype1::bar"); + CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveCommand>( + getBucketToAbort(), docId, api::Timestamp(1234))); + CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveCommand>( + getBucketToAllow(), docId, api::Timestamp(1234))); +} + +void +ChangedBucketOwnershipHandlerTest::testAbortOutdatedRevertCommand() +{ + std::vector<api::Timestamp> timestamps; + CPPUNIT_ASSERT(changeAbortsMessage<api::RevertCommand>( + getBucketToAbort(), timestamps)); + CPPUNIT_ASSERT(!changeAbortsMessage<api::RevertCommand>( + getBucketToAllow(), timestamps)); +} + +void +ChangedBucketOwnershipHandlerTest::testIdealStateAbortUpdatesMetric() +{ + CPPUNIT_ASSERT(changeAbortsMessage<api::SplitBucketCommand>( + getBucketToAbort())); + CPPUNIT_ASSERT_EQUAL( + uint64_t(1), + _handler->getMetrics().idealStateOpsAborted.getValue()); + CPPUNIT_ASSERT_EQUAL( + uint64_t(0), + _handler->getMetrics().externalLoadOpsAborted.getValue()); +} + +void +ChangedBucketOwnershipHandlerTest::testExternalLoadOpAbortUpdatesMetric() +{ + document::DocumentId docId("id:foo:testdoctype1::bar"); + CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveCommand>( + getBucketToAbort(), docId, api::Timestamp(1234))); + CPPUNIT_ASSERT_EQUAL( + uint64_t(0), + _handler->getMetrics().idealStateOpsAborted.getValue()); + CPPUNIT_ASSERT_EQUAL( + uint64_t(1), + _handler->getMetrics().externalLoadOpsAborted.getValue()); +} + +void +ChangedBucketOwnershipHandlerTest::testExternalLoadOpAbortsAreConfigurable() +{ + std::unique_ptr<vespa::config::content::PersistenceConfigBuilder> config( + new vespa::config::content::PersistenceConfigBuilder); + config->abortOutdatedMutatingExternalLoadOps = false; + _handler->configure(std::move(config)); + // Should not abort operation, even when ownership has changed. + document::DocumentId docId("id:foo:testdoctype1::bar"); + CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveCommand>( + getBucketToAbort(), docId, api::Timestamp(1234))); +} + +} // storage diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp new file mode 100644 index 00000000000..fe062a9ee30 --- /dev/null +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -0,0 +1,235 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storage/storageserver/communicationmanager.h> + +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { + +struct CommunicationManagerTest : public CppUnit::TestFixture { + void testSimple(); + void testDistPendingLimitConfigsArePropagatedToMessageBus(); + void testStorPendingLimitConfigsArePropagatedToMessageBus(); + void testCommandsAreDequeuedInPriorityOrder(); + void testRepliesAreDequeuedInFifoOrder(); + + static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60; + + void doTestConfigPropagation(bool isContentNode); + + std::shared_ptr<api::StorageCommand> createDummyCommand( + api::StorageMessage::Priority priority) + { + auto cmd = std::make_shared<api::GetCommand>( + document::BucketId(0), + document::DocumentId("doc::mydoc"), + "[all]"); + cmd->setAddress(api::StorageMessageAddress( + "storage", lib::NodeType::STORAGE, 1)); + cmd->setPriority(priority); + return cmd; + } + + CPPUNIT_TEST_SUITE(CommunicationManagerTest); + CPPUNIT_TEST(testSimple); + CPPUNIT_TEST(testDistPendingLimitConfigsArePropagatedToMessageBus); + CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus); + CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder); + CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(CommunicationManagerTest); + +void CommunicationManagerTest::testSimple() +{ + mbus::Slobrok slobrok; + vdstestlib::DirConfig distConfig(getStandardConfig(false)); + vdstestlib::DirConfig storConfig(getStandardConfig(true)); + distConfig.getConfig("stor-server").set("node_index", "1"); + storConfig.getConfig("stor-server").set("node_index", "1"); + addSlobrokConfig(distConfig, slobrok); + addSlobrokConfig(storConfig, slobrok); + + // Set up a "distributor" and a "storage" node with communication + // managers and a dummy storage link below we can use for testing. + TestServiceLayerApp storNode(storConfig.getConfigId()); + TestDistributorApp distNode(distConfig.getConfigId()); + + CommunicationManager distributor(distNode.getComponentRegister(), + distConfig.getConfigId()); + CommunicationManager storage(storNode.getComponentRegister(), + storConfig.getConfigId()); + DummyStorageLink *distributorLink = new DummyStorageLink(); + DummyStorageLink *storageLink = new DummyStorageLink(); + distributor.push_back(std::unique_ptr<StorageLink>(distributorLink)); + storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + distributor.open(); + storage.open(); + + FastOS_Thread::Sleep(1000); + + // Send a message through from distributor to storage + std::shared_ptr<api::StorageCommand> cmd( + new api::GetCommand( + document::BucketId(0), document::DocumentId("doc::mydoc"), "[all]")); + cmd->setAddress(api::StorageMessageAddress( + "storage", lib::NodeType::STORAGE, 1)); + distributorLink->sendUp(cmd); + storageLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC); + CPPUNIT_ASSERT(storageLink->getNumCommands() > 0); + std::shared_ptr<api::StorageCommand> cmd2( + std::dynamic_pointer_cast<api::StorageCommand>( + storageLink->getCommand(0))); + CPPUNIT_ASSERT_EQUAL( + vespalib::string("doc::mydoc"), + static_cast<api::GetCommand&>(*cmd2).getDocumentId().toString()); + // Reply to the message + std::shared_ptr<api::StorageReply> reply(cmd2->makeReply().release()); + storageLink->sendUp(reply); + storageLink->sendUp(reply); + distributorLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC); + CPPUNIT_ASSERT(distributorLink->getNumCommands() > 0); + std::shared_ptr<api::GetReply> reply2( + std::dynamic_pointer_cast<api::GetReply>( + distributorLink->getCommand(0))); + CPPUNIT_ASSERT_EQUAL(false, reply2->wasFound()); +} + +void +CommunicationManagerTest::doTestConfigPropagation(bool isContentNode) +{ + mbus::Slobrok slobrok; + vdstestlib::DirConfig config(getStandardConfig(isContentNode)); + config.getConfig("stor-server").set("node_index", "1"); + auto& cfg = config.getConfig("stor-communicationmanager"); + cfg.set("mbus_content_node_max_pending_count", "12345"); + cfg.set("mbus_content_node_max_pending_size", "555666"); + cfg.set("mbus_distributor_node_max_pending_count", "6789"); + cfg.set("mbus_distributor_node_max_pending_size", "777888"); + addSlobrokConfig(config, slobrok); + + std::unique_ptr<TestStorageApp> node; + if (isContentNode) { + node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); + } else { + node = std::make_unique<TestDistributorApp>(config.getConfigId()); + } + + CommunicationManager commMgr(node->getComponentRegister(), + config.getConfigId()); + DummyStorageLink *storageLink = new DummyStorageLink(); + commMgr.push_back(std::unique_ptr<StorageLink>(storageLink)); + commMgr.open(); + + // Outer type is RPCMessageBus, which wraps regular MessageBus. + auto& mbus = commMgr.getMessageBus().getMessageBus(); + if (isContentNode) { + CPPUNIT_ASSERT_EQUAL(uint32_t(12345), mbus.getMaxPendingCount()); + CPPUNIT_ASSERT_EQUAL(uint32_t(555666), mbus.getMaxPendingSize()); + } else { + CPPUNIT_ASSERT_EQUAL(uint32_t(6789), mbus.getMaxPendingCount()); + CPPUNIT_ASSERT_EQUAL(uint32_t(777888), mbus.getMaxPendingSize()); + } + + // Test live reconfig of limits. + using ConfigBuilder + = vespa::config::content::core::StorCommunicationmanagerConfigBuilder; + auto liveCfg = std::make_unique<ConfigBuilder>(); + liveCfg->mbusContentNodeMaxPendingCount = 777777; + liveCfg->mbusDistributorNodeMaxPendingCount = 999999; + + commMgr.configure(std::move(liveCfg)); + if (isContentNode) { + CPPUNIT_ASSERT_EQUAL(uint32_t(777777), mbus.getMaxPendingCount()); + } else { + CPPUNIT_ASSERT_EQUAL(uint32_t(999999), mbus.getMaxPendingCount()); + } +} + +void +CommunicationManagerTest::testDistPendingLimitConfigsArePropagatedToMessageBus() +{ + doTestConfigPropagation(false); +} + +void +CommunicationManagerTest::testStorPendingLimitConfigsArePropagatedToMessageBus() +{ + doTestConfigPropagation(true); +} + +void +CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder() +{ + mbus::Slobrok slobrok; + vdstestlib::DirConfig storConfig(getStandardConfig(true)); + storConfig.getConfig("stor-server").set("node_index", "1"); + addSlobrokConfig(storConfig, slobrok); + TestServiceLayerApp storNode(storConfig.getConfigId()); + + CommunicationManager storage(storNode.getComponentRegister(), + storConfig.getConfigId()); + DummyStorageLink *storageLink = new DummyStorageLink(); + storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + + // Message dequeing does not start before we invoke `open` on the storage + // link chain, so we enqueue messages in randomized priority order before + // doing so. After starting the thread, we should then get messages down + // the chain in a deterministic, prioritized order. + // Lower number == higher priority. + std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; + for (auto pri : pris) { + storage.enqueue(createDummyCommand(pri)); + } + storage.open(); + storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); + + std::sort(pris.begin(), pris.end()); + for (size_t i = 0; i < pris.size(); ++i) { + // Casting is just to avoid getting mismatched values printed to the + // output verbatim as chars. + CPPUNIT_ASSERT_EQUAL( + uint32_t(pris[i]), + uint32_t(storageLink->getCommand(i)->getPriority())); + } +} + +void +CommunicationManagerTest::testRepliesAreDequeuedInFifoOrder() +{ + mbus::Slobrok slobrok; + vdstestlib::DirConfig storConfig(getStandardConfig(true)); + storConfig.getConfig("stor-server").set("node_index", "1"); + addSlobrokConfig(storConfig, slobrok); + TestServiceLayerApp storNode(storConfig.getConfigId()); + + CommunicationManager storage(storNode.getComponentRegister(), + storConfig.getConfigId()); + DummyStorageLink *storageLink = new DummyStorageLink(); + storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + + std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; + for (auto pri : pris) { + storage.enqueue(createDummyCommand(pri)->makeReply()); + } + storage.open(); + storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); + + // Want FIFO order for replies, not priority-sorted order. + for (size_t i = 0; i < pris.size(); ++i) { + CPPUNIT_ASSERT_EQUAL( + uint32_t(pris[i]), + uint32_t(storageLink->getCommand(i)->getPriority())); + } +} + +} // storage diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp new file mode 100644 index 00000000000..69083352c4a --- /dev/null +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -0,0 +1,529 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <cppunit/extensions/HelperMacros.h> +#include <vespa/document/config/config-documenttypes.h> +#include <vespa/document/datatype/datatype.h> +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/storage/storageserver/documentapiconverter.h> +#include <vespa/storageapi/message/batch.h> +#include <vespa/storageapi/message/datagram.h> +#include <vespa/storageapi/message/multioperation.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/visitor.h> +#include <vespa/vdslib/container/writabledocumentlist.h> + +using document::DataType; +using document::DocIdString; +using document::Document; +using document::DocumentId; +using document::DocumentTypeRepo; +using document::readDocumenttypesConfig; + +namespace storage { + +struct DocumentApiConverterTest : public CppUnit::TestFixture +{ + std::unique_ptr<DocumentApiConverter> _converter; + const DocumentTypeRepo::SP _repo; + const DataType& _html_type; + + DocumentApiConverterTest() + : _repo(new DocumentTypeRepo(readDocumenttypesConfig( + "config-doctypes.cfg"))), + _html_type(*_repo->getDocumentType("text/html")) + { + } + + void setUp() { + _converter.reset(new DocumentApiConverter("raw:")); + }; + + void testPut(); + void testForwardedPut(); + void testRemove(); + void testGet(); + void testCreateVisitor(); + void testCreateVisitorHighTimeout(); + void testCreateVisitorReplyNotReady(); + void testCreateVisitorReplyLastBucket(); + void testDestroyVisitor(); + void testVisitorInfo(); + void testDocBlock(); + void testDocBlockWithKeepTimeStamps(); + void testMultiOperation(); + void testBatchDocumentUpdate(); + + CPPUNIT_TEST_SUITE(DocumentApiConverterTest); + CPPUNIT_TEST(testPut); + CPPUNIT_TEST(testForwardedPut); + CPPUNIT_TEST(testRemove); + CPPUNIT_TEST(testGet); + CPPUNIT_TEST(testCreateVisitor); + CPPUNIT_TEST(testCreateVisitorHighTimeout); + CPPUNIT_TEST(testCreateVisitorReplyNotReady); + CPPUNIT_TEST(testCreateVisitorReplyLastBucket); + CPPUNIT_TEST(testDestroyVisitor); + CPPUNIT_TEST(testVisitorInfo); + CPPUNIT_TEST(testDocBlock); + CPPUNIT_TEST(testDocBlockWithKeepTimeStamps); + CPPUNIT_TEST(testMultiOperation); + CPPUNIT_TEST(testBatchDocumentUpdate); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(DocumentApiConverterTest); + +void DocumentApiConverterTest::testPut() +{ + Document::SP + doc(new Document(_html_type, DocumentId(DocIdString("test", "test")))); + + documentapi::PutDocumentMessage putmsg(doc); + putmsg.setTimestamp(1234); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(putmsg, _repo); + + api::PutCommand* pc = dynamic_cast<api::PutCommand*>(cmd.get()); + + CPPUNIT_ASSERT(pc); + CPPUNIT_ASSERT(pc->getDocument().get() == doc.get()); + + std::unique_ptr<mbus::Reply> reply = putmsg.createReply(); + CPPUNIT_ASSERT(reply.get()); + + std::unique_ptr<storage::api::StorageReply> rep = _converter->toStorageAPI( + static_cast<documentapi::DocumentReply&>(*reply), *cmd); + api::PutReply* pr = dynamic_cast<api::PutReply*>(rep.get()); + CPPUNIT_ASSERT(pr); + + std::unique_ptr<mbus::Message> mbusmsg = + _converter->toDocumentAPI(*pc, _repo); + + documentapi::PutDocumentMessage* mbusput = dynamic_cast<documentapi::PutDocumentMessage*>(mbusmsg.get()); + CPPUNIT_ASSERT(mbusput); + CPPUNIT_ASSERT(mbusput->getDocument().get() == doc.get()); + CPPUNIT_ASSERT(mbusput->getTimestamp() == 1234); +}; + +void DocumentApiConverterTest::testForwardedPut() +{ + Document::SP + doc(new Document(_html_type, DocumentId(DocIdString("test", "test")))); + + documentapi::PutDocumentMessage* putmsg = new documentapi::PutDocumentMessage(doc); + std::unique_ptr<mbus::Reply> reply(((documentapi::DocumentMessage*)putmsg)->createReply()); + reply->setMessage(std::unique_ptr<mbus::Message>(putmsg)); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(*putmsg, _repo); + ((storage::api::PutCommand*)cmd.get())->setTimestamp(1234); + + std::unique_ptr<storage::api::StorageReply> rep = cmd->makeReply(); + api::PutReply* pr = dynamic_cast<api::PutReply*>(rep.get()); + CPPUNIT_ASSERT(pr); + + _converter->transferReplyState(*pr, *reply); +} + +void DocumentApiConverterTest::testRemove() +{ + documentapi::RemoveDocumentMessage removemsg(document::DocumentId(document::DocIdString("test", "test"))); + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(removemsg, _repo); + + api::RemoveCommand* rc = dynamic_cast<api::RemoveCommand*>(cmd.get()); + + CPPUNIT_ASSERT(rc); + CPPUNIT_ASSERT_EQUAL(document::DocumentId(document::DocIdString("test", "test")), rc->getDocumentId()); + + std::unique_ptr<mbus::Reply> reply = removemsg.createReply(); + CPPUNIT_ASSERT(reply.get()); + + std::unique_ptr<storage::api::StorageReply> rep = _converter->toStorageAPI( + static_cast<documentapi::DocumentReply&>(*reply), *cmd); + api::RemoveReply* pr = dynamic_cast<api::RemoveReply*>(rep.get()); + CPPUNIT_ASSERT(pr); + + std::unique_ptr<mbus::Message> mbusmsg = + _converter->toDocumentAPI(*rc, _repo); + + documentapi::RemoveDocumentMessage* mbusremove = dynamic_cast<documentapi::RemoveDocumentMessage*>(mbusmsg.get()); + CPPUNIT_ASSERT(mbusremove); + CPPUNIT_ASSERT_EQUAL(document::DocumentId(document::DocIdString("test", "test")), mbusremove->getDocumentId()); +}; + +void DocumentApiConverterTest::testGet() +{ + documentapi::GetDocumentMessage getmsg( + document::DocumentId(document::DocIdString("test", "test")), + "foo bar"); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(getmsg, _repo); + + api::GetCommand* rc = dynamic_cast<api::GetCommand*>(cmd.get()); + + CPPUNIT_ASSERT(rc); + CPPUNIT_ASSERT_EQUAL(document::DocumentId(document::DocIdString("test", "test")), rc->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("foo bar"), rc->getFieldSet()); +}; + +void DocumentApiConverterTest::testCreateVisitor() +{ + documentapi::CreateVisitorMessage cv( + "mylib", + "myinstance", + "control-dest", + "data-dest"); + + cv.setTimeRemaining(123456); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(cv, _repo); + + api::CreateVisitorCommand* pc = dynamic_cast<api::CreateVisitorCommand*>(cmd.get()); + + CPPUNIT_ASSERT(pc); + CPPUNIT_ASSERT_EQUAL(vespalib::string("mylib"), pc->getLibraryName()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("myinstance"), pc->getInstanceId()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("control-dest"), pc->getControlDestination()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("data-dest"), pc->getDataDestination()); + CPPUNIT_ASSERT_EQUAL(123456u, pc->getTimeout()); +} + +void DocumentApiConverterTest::testCreateVisitorHighTimeout() +{ + documentapi::CreateVisitorMessage cv( + "mylib", + "myinstance", + "control-dest", + "data-dest"); + + cv.setTimeRemaining((uint64_t)std::numeric_limits<uint32_t>::max() + 1); // Will be INT_MAX + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(cv, _repo); + + api::CreateVisitorCommand* pc = dynamic_cast<api::CreateVisitorCommand*>(cmd.get()); + + CPPUNIT_ASSERT(pc); + CPPUNIT_ASSERT_EQUAL(vespalib::string("mylib"), pc->getLibraryName()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("myinstance"), pc->getInstanceId()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("control-dest"), pc->getControlDestination()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("data-dest"), pc->getDataDestination()); + CPPUNIT_ASSERT_EQUAL((uint32_t) std::numeric_limits<int32_t>::max(), + pc->getTimeout()); +} + +void DocumentApiConverterTest::testCreateVisitorReplyNotReady() +{ + documentapi::CreateVisitorMessage cv( + "mylib", + "myinstance", + "control-dest", + "data-dest"); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(cv, _repo); + CPPUNIT_ASSERT(cmd.get()); + api::CreateVisitorCommand& cvc = dynamic_cast<api::CreateVisitorCommand&>(*cmd); + + api::CreateVisitorReply cvr(cvc); + cvr.setResult(api::ReturnCode(api::ReturnCode::NOT_READY, "not ready")); + + std::unique_ptr<documentapi::CreateVisitorReply> reply( + dynamic_cast<documentapi::CreateVisitorReply*>( + cv.createReply().release())); + CPPUNIT_ASSERT(reply.get()); + + _converter->transferReplyState(cvr, *reply); + + CPPUNIT_ASSERT_EQUAL((uint32_t)documentapi::DocumentProtocol::ERROR_NODE_NOT_READY, reply->getError(0).getCode()); + + CPPUNIT_ASSERT_EQUAL(document::BucketId(INT_MAX), reply->getLastBucket()); +} + + +void DocumentApiConverterTest::testCreateVisitorReplyLastBucket() +{ + documentapi::CreateVisitorMessage cv( + "mylib", + "myinstance", + "control-dest", + "data-dest"); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(cv, _repo); + CPPUNIT_ASSERT(cmd.get()); + api::CreateVisitorCommand& cvc = dynamic_cast<api::CreateVisitorCommand&>(*cmd); + + + api::CreateVisitorReply cvr(cvc); + cvr.setLastBucket(document::BucketId(123)); + + + std::unique_ptr<documentapi::CreateVisitorReply> reply( + dynamic_cast<documentapi::CreateVisitorReply*>( + cv.createReply().release())); + + CPPUNIT_ASSERT(reply.get()); + + _converter->transferReplyState(cvr, *reply); + + CPPUNIT_ASSERT_EQUAL(document::BucketId(123), reply->getLastBucket()); +} + + +void DocumentApiConverterTest::testDestroyVisitor() +{ + documentapi::DestroyVisitorMessage cv("myinstance"); + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(cv, _repo); + + api::DestroyVisitorCommand* pc = dynamic_cast<api::DestroyVisitorCommand*>(cmd.get()); + + CPPUNIT_ASSERT(pc); + CPPUNIT_ASSERT_EQUAL(vespalib::string("myinstance"), pc->getInstanceId()); +} + +void +DocumentApiConverterTest::testVisitorInfo() +{ + api::VisitorInfoCommand vicmd; + std::vector<api::VisitorInfoCommand::BucketTimestampPair> bucketsCompleted; + bucketsCompleted.push_back(api::VisitorInfoCommand::BucketTimestampPair(document::BucketId(16, 1), 0)); + bucketsCompleted.push_back(api::VisitorInfoCommand::BucketTimestampPair(document::BucketId(16, 2), 0)); + bucketsCompleted.push_back(api::VisitorInfoCommand::BucketTimestampPair(document::BucketId(16, 4), 0)); + + vicmd.setBucketsCompleted(bucketsCompleted); + + std::unique_ptr<mbus::Message> mbusmsg = + _converter->toDocumentAPI(vicmd, _repo); + + documentapi::VisitorInfoMessage* mbusvi = dynamic_cast<documentapi::VisitorInfoMessage*>(mbusmsg.get()); + CPPUNIT_ASSERT(mbusvi); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), mbusvi->getFinishedBuckets()[0]); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 2), mbusvi->getFinishedBuckets()[1]); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 4), mbusvi->getFinishedBuckets()[2]); + + std::unique_ptr<mbus::Reply> reply = mbusvi->createReply(); + CPPUNIT_ASSERT(reply.get()); + + std::unique_ptr<storage::api::StorageReply> rep = _converter->toStorageAPI( + static_cast<documentapi::DocumentReply&>(*reply), vicmd); + api::VisitorInfoReply* pr = dynamic_cast<api::VisitorInfoReply*>(rep.get()); + CPPUNIT_ASSERT(pr); +} + +void +DocumentApiConverterTest::testDocBlock() +{ + Document::SP + doc(new Document(_html_type, DocumentId(DocIdString("test", "test")))); + + char buffer[10000]; + vdslib::WritableDocumentList docBlock(_repo, buffer, sizeof(buffer)); + docBlock.addPut(*doc, 100); + + document::BucketIdFactory fac; + document::BucketId bucketId = fac.getBucketId(doc->getId()); + bucketId.setUsedBits(32); + + api::DocBlockCommand dbcmd(bucketId, docBlock, std::shared_ptr<void>()); + + dbcmd.setTimeout(123456); + + std::unique_ptr<mbus::Message> mbusmsg = + _converter->toDocumentAPI(dbcmd, _repo); + + documentapi::MultiOperationMessage* mbusdb = dynamic_cast<documentapi::MultiOperationMessage*>(mbusmsg.get()); + CPPUNIT_ASSERT(mbusdb); + + CPPUNIT_ASSERT_EQUAL((uint64_t)123456, mbusdb->getTimeRemaining()); + + const vdslib::DocumentList& list = mbusdb->getOperations(); + CPPUNIT_ASSERT_EQUAL((uint32_t)1, list.size()); + CPPUNIT_ASSERT_EQUAL(*doc, *dynamic_cast<document::Document*>(list.begin()->getDocument().get())); + + std::unique_ptr<mbus::Reply> reply = mbusdb->createReply(); + CPPUNIT_ASSERT(reply.get()); + + std::unique_ptr<storage::api::StorageReply> rep = + _converter->toStorageAPI(static_cast<documentapi::DocumentReply&>(*reply), dbcmd); + api::DocBlockReply* pr = dynamic_cast<api::DocBlockReply*>(rep.get()); + CPPUNIT_ASSERT(pr); +} + + +void +DocumentApiConverterTest::testDocBlockWithKeepTimeStamps() +{ + char buffer[10000]; + vdslib::WritableDocumentList docBlock(_repo, buffer, sizeof(buffer)); + api::DocBlockCommand dbcmd(document::BucketId(0), docBlock, std::shared_ptr<void>()); + + { + CPPUNIT_ASSERT_EQUAL(dbcmd.keepTimeStamps(), false); + + std::unique_ptr<mbus::Message> mbusmsg = + _converter->toDocumentAPI(dbcmd, _repo); + + documentapi::MultiOperationMessage* mbusdb = dynamic_cast<documentapi::MultiOperationMessage*>(mbusmsg.get()); + CPPUNIT_ASSERT(mbusdb); + + CPPUNIT_ASSERT_EQUAL(mbusdb->keepTimeStamps(), false); + } + + { + dbcmd.keepTimeStamps(true); + CPPUNIT_ASSERT_EQUAL(dbcmd.keepTimeStamps(), true); + + std::unique_ptr<mbus::Message> mbusmsg = + _converter->toDocumentAPI(dbcmd, _repo); + + documentapi::MultiOperationMessage* mbusdb = dynamic_cast<documentapi::MultiOperationMessage*>(mbusmsg.get()); + CPPUNIT_ASSERT(mbusdb); + + CPPUNIT_ASSERT_EQUAL(mbusdb->keepTimeStamps(), true); + } + +} + + +void +DocumentApiConverterTest::testMultiOperation() +{ + //create a document + Document::SP + doc(new Document(_html_type, DocumentId(DocIdString("test", "test")))); + + document::BucketIdFactory fac; + document::BucketId bucketId = fac.getBucketId(doc->getId()); + bucketId.setUsedBits(32); + + { + documentapi::MultiOperationMessage momsg(_repo, bucketId, 10000); + + vdslib::WritableDocumentList operations(_repo, &(momsg.getBuffer()[0]), + momsg.getBuffer().size()); + operations.addPut(*doc, 100); + + momsg.setOperations(operations); + + CPPUNIT_ASSERT(momsg.getBuffer().size() > 0); + + // Convert it to Storage API + std::unique_ptr<api::StorageCommand> stcmd = + _converter->toStorageAPI(momsg, _repo); + + api::MultiOperationCommand* mocmd = dynamic_cast<api::MultiOperationCommand*>(stcmd.get()); + CPPUNIT_ASSERT(mocmd); + CPPUNIT_ASSERT(mocmd->getBuffer().size() > 0); + + // Get operations from Storage API message and check document + const vdslib::DocumentList& list = mocmd->getOperations(); + CPPUNIT_ASSERT_EQUAL((uint32_t)1, list.size()); + CPPUNIT_ASSERT_EQUAL(*doc, *dynamic_cast<document::Document*>(list.begin()->getDocument().get())); + + // Create Storage API Reply + std::unique_ptr<api::MultiOperationReply> moreply = std::unique_ptr<api::MultiOperationReply>(new api::MultiOperationReply(*mocmd)); + CPPUNIT_ASSERT(moreply.get()); + + // convert storage api reply to mbus reply..... + // ... + } + + { + api::MultiOperationCommand mocmd(_repo, bucketId, 10000, false); + mocmd.getOperations().addPut(*doc, 100); + + // Convert it to documentapi + std::unique_ptr<mbus::Message> mbmsg = + _converter->toDocumentAPI(mocmd, _repo); + documentapi::MultiOperationMessage* momsg = dynamic_cast<documentapi::MultiOperationMessage*>(mbmsg.get()); + CPPUNIT_ASSERT(momsg); + + // Get operations from Document API msg and check document + const vdslib::DocumentList& list = momsg->getOperations(); + CPPUNIT_ASSERT_EQUAL((uint32_t)1, list.size()); + CPPUNIT_ASSERT_EQUAL(*doc, *dynamic_cast<document::Document*>(list.begin()->getDocument().get())); + + // Create Document API reply + mbus::Reply::UP moreply = momsg->createReply(); + CPPUNIT_ASSERT(moreply.get()); + + //Convert DocumentAPI reply to storageapi reply + std::unique_ptr<api::StorageReply> streply = + _converter->toStorageAPI(static_cast<documentapi::DocumentReply&>(*moreply), mocmd); + api::MultiOperationReply* mostreply = dynamic_cast<api::MultiOperationReply*>(streply.get()); + CPPUNIT_ASSERT(mostreply); + + } +} + +void +DocumentApiConverterTest::testBatchDocumentUpdate() +{ + std::vector<document::DocumentUpdate::SP > updates; + + { + document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test1")); + document::DocumentUpdate::SP update( + new document::DocumentUpdate(_html_type, docId)); + updates.push_back(update); + } + + { + document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test2")); + document::DocumentUpdate::SP update( + new document::DocumentUpdate(_html_type, docId)); + updates.push_back(update); + } + + { + document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test3")); + document::DocumentUpdate::SP update( + new document::DocumentUpdate(_html_type, docId)); + updates.push_back(update); + } + + std::shared_ptr<documentapi::BatchDocumentUpdateMessage> msg( + new documentapi::BatchDocumentUpdateMessage(1234)); + for (std::size_t i = 0; i < updates.size(); ++i) { + msg->addUpdate(updates[i]); + } + + std::unique_ptr<storage::api::StorageCommand> cmd = + _converter->toStorageAPI(*msg, _repo); + api::BatchDocumentUpdateCommand* batchCmd = dynamic_cast<api::BatchDocumentUpdateCommand*>(cmd.get()); + CPPUNIT_ASSERT(batchCmd); + CPPUNIT_ASSERT_EQUAL(updates.size(), batchCmd->getUpdates().size()); + for (std::size_t i = 0; i < updates.size(); ++i) { + CPPUNIT_ASSERT_EQUAL(*updates[i], *batchCmd->getUpdates()[i]); + } + + api::BatchDocumentUpdateReply batchReply(*batchCmd); + batchReply.getDocumentsNotFound().resize(3); + batchReply.getDocumentsNotFound()[0] = true; + batchReply.getDocumentsNotFound()[2] = true; + + std::unique_ptr<mbus::Reply> mbusReply = msg->createReply(); + documentapi::BatchDocumentUpdateReply* mbusBatchReply( + dynamic_cast<documentapi::BatchDocumentUpdateReply*>(mbusReply.get())); + CPPUNIT_ASSERT(mbusBatchReply != 0); + + _converter->transferReplyState(batchReply, *mbusReply); + + CPPUNIT_ASSERT_EQUAL(std::size_t(3), mbusBatchReply->getDocumentsNotFound().size()); + CPPUNIT_ASSERT(mbusBatchReply->getDocumentsNotFound()[0] == true); + CPPUNIT_ASSERT(mbusBatchReply->getDocumentsNotFound()[1] == false); + CPPUNIT_ASSERT(mbusBatchReply->getDocumentsNotFound()[2] == true); +} + +} diff --git a/storage/src/tests/storageserver/dummystoragelink.cpp b/storage/src/tests/storageserver/dummystoragelink.cpp new file mode 100644 index 00000000000..7194f1fba3d --- /dev/null +++ b/storage/src/tests/storageserver/dummystoragelink.cpp @@ -0,0 +1,182 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <tests/common/dummystoragelink.h> +#include <sys/time.h> + +namespace storage { + +DummyStorageLink* DummyStorageLink::_last(0); + +DummyStorageLink::DummyStorageLink() + : StorageLink("Dummy storage link"), + _commands(), + _replies(), + _injected(), + _autoReply(false), + _useDispatch(false), + _ignore(false), + _waitMonitor() +{ + _last = this; +} + +DummyStorageLink::~DummyStorageLink() +{ + // Often a chain with dummy link on top is deleted in unit tests. + // If they haven't been closed already, close them for a cleaner + // shutdown + if (getState() == OPENED) { + close(); + flush(); + } + closeNextLink(); + reset(); +} + +bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd) +{ + if (_ignore) { + return false; + } + if (_injected.size() > 0) { + vespalib::LockGuard guard(_lock); + sendUp(*_injected.begin()); + _injected.pop_front(); + } else if (_autoReply) { + if (!cmd->getType().isReply()) { + std::shared_ptr<api::StorageReply> reply( + std::dynamic_pointer_cast<api::StorageCommand>(cmd) + ->makeReply().release()); + reply->setResult(api::ReturnCode( + api::ReturnCode::OK, "Automatically generated reply")); + sendUp(reply); + } + } + if (isBottom()) { + vespalib::MonitorGuard lock(_waitMonitor); + { + vespalib::LockGuard guard(_lock); + _commands.push_back(cmd); + } + lock.broadcast(); + return true; + } + return StorageLink::onDown(cmd); +} + +bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) { + if (isTop()) { + vespalib::MonitorGuard lock(_waitMonitor); + { + vespalib::LockGuard guard(_lock); + _replies.push_back(reply); + } + lock.broadcast(); + return true; + } + return StorageLink::onUp(reply); + +} + +void DummyStorageLink::injectReply(api::StorageReply* reply) +{ + assert(reply); + vespalib::LockGuard guard(_lock); + _injected.push_back(std::shared_ptr<api::StorageReply>(reply)); +} + +void DummyStorageLink::reset() { + vespalib::MonitorGuard lock(_waitMonitor); + vespalib::LockGuard guard(_lock); + _commands.clear(); + _replies.clear(); + _injected.clear(); +} + +void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout) +{ + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000)); + vespalib::MonitorGuard lock(_waitMonitor); + while (_commands.size() + _replies.size() < msgCount) { + if (timeout != 0 && clock.getTimeInMillis() > endTime) { + std::ostringstream ost; + ost << "Timed out waiting for " << msgCount << " messages to " + << "arrive in dummy storage link. Only " + << (_commands.size() + _replies.size()) << " messages seen " + << "after timout of " << timeout << " seconds was reached."; + throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); + } + if (timeout >= 0) { + lock.wait((endTime - clock.getTimeInMillis()).getTime()); + } else { + lock.wait(); + } + } +} + +void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout) +{ + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000)); + vespalib::MonitorGuard lock(_waitMonitor); + while (true) { + for (uint32_t i=0; i<_commands.size(); ++i) { + if (_commands[i]->getType() == type) return; + } + for (uint32_t i=0; i<_replies.size(); ++i) { + if (_replies[i]->getType() == type) return; + } + if (timeout != 0 && clock.getTimeInMillis() > endTime) { + std::ostringstream ost; + ost << "Timed out waiting for " << type << " message to " + << "arrive in dummy storage link. Only " + << (_commands.size() + _replies.size()) << " messages seen " + << "after timout of " << timeout << " seconds was reached."; + if (_commands.size() == 1) { + ost << " Found command of type " << _commands[0]->getType(); + } + if (_replies.size() == 1) { + ost << " Found command of type " << _replies[0]->getType(); + } + throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); + } + if (timeout >= 0) { + lock.wait((endTime - clock.getTimeInMillis()).getTime()); + } else { + lock.wait(); + } + } +} + +api::StorageMessage::SP +DummyStorageLink::getAndRemoveMessage(const api::MessageType& type) +{ + vespalib::MonitorGuard lock(_waitMonitor); + for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin(); + it != _commands.end(); ++it) + { + if ((*it)->getType() == type) { + api::StorageMessage::SP result(*it); + _commands.erase(it); + return result; + } + } + for (std::vector<api::StorageMessage::SP>::iterator it = _replies.begin(); + it != _replies.end(); ++it) + { + if ((*it)->getType() == type) { + api::StorageMessage::SP result(*it); + _replies.erase(it); + return result; + } + } + std::ostringstream ost; + ost << "No message of type " << type << " found."; + throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); +} + +} // storage diff --git a/storage/src/tests/storageserver/dummystoragelink.h b/storage/src/tests/storageserver/dummystoragelink.h new file mode 100644 index 00000000000..cb9df8c5642 --- /dev/null +++ b/storage/src/tests/storageserver/dummystoragelink.h @@ -0,0 +1,115 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/sync.h> +#include <list> +#include <sstream> +#include <vespa/storageapi/messageapi/storagecommand.h> +#include <string> +#include <vector> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storage/common/bucketmessages.h> +#include <vespa/storageapi/message/internal.h> + +class FastOS_ThreadPool; + +namespace storage { + +class DummyStorageLink : public StorageLink { + + mutable vespalib::Lock _lock; // to protect below containers: + std::vector<api::StorageMessage::SP> _commands; + std::vector<api::StorageMessage::SP> _replies; + std::list<api::StorageMessage::SP> _injected; + + bool _autoReply; + bool _useDispatch; + bool _ignore; + static DummyStorageLink* _last; + vespalib::Monitor _waitMonitor; + +public: + DummyStorageLink(); + ~DummyStorageLink(); + + bool onDown(const api::StorageMessage::SP&); + bool onUp(const api::StorageMessage::SP&); + + void addOnTopOfChain(StorageLink& link) { + link.addTestLinkOnTop(this); + } + + void print(std::ostream& ost, bool verbose, const std::string& indent) const + { + (void) verbose; + ost << indent << "DummyStorageLink(" + << "autoreply = " << (_autoReply ? "on" : "off") + << ", dispatch = " << (_useDispatch ? "on" : "off") + << ", " << _commands.size() << " commands" + << ", " << _replies.size() << " replies"; + if (_injected.size() > 0) + ost << ", " << _injected.size() << " injected"; + ost << ")"; + } + + void injectReply(api::StorageReply* reply); + void reset(); + void setAutoreply(bool autoReply) { _autoReply = autoReply; } + void setIgnore(bool ignore) { _ignore = ignore; } + // Timeout is given in seconds + void waitForMessages(unsigned int msgCount = 1, int timeout = -1); + // Wait for a single message of a given type + void waitForMessage(const api::MessageType&, int timeout = -1); + + api::StorageMessage::SP getCommand(size_t i) const { + vespalib::LockGuard guard(_lock); + api::StorageMessage::SP ret = _commands[i]; + return ret; + } + api::StorageMessage::SP getReply(size_t i) const { + vespalib::LockGuard guard(_lock); + api::StorageMessage::SP ret = _replies[i]; + return ret; + } + size_t getNumCommands() const { + vespalib::LockGuard guard(_lock); + return _commands.size(); + } + size_t getNumReplies() const { + vespalib::LockGuard guard(_lock); + return _replies.size(); + } + + const std::vector<api::StorageMessage::SP>& getCommands() const + { return _commands; } + const std::vector<api::StorageMessage::SP>& getReplies() const + { return _replies; } + + std::vector<api::StorageMessage::SP> getCommandsOnce() { + vespalib::MonitorGuard lock(_waitMonitor); + std::vector<api::StorageMessage::SP> retval; + { + vespalib::LockGuard guard(_lock); + retval.swap(_commands); + } + return retval; + } + + std::vector<api::StorageMessage::SP> getRepliesOnce() { + vespalib::MonitorGuard lock(_waitMonitor); + std::vector<api::StorageMessage::SP> retval; + { + vespalib::LockGuard guard(_lock); + retval.swap(_replies); + } + return retval; + } + + api::StorageMessage::SP getAndRemoveMessage(const api::MessageType&); + + static DummyStorageLink* getLast() { return _last; } +}; + +} + diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp new file mode 100644 index 00000000000..e705db80788 --- /dev/null +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -0,0 +1,1566 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <cppunit/extensions/HelperMacros.h> +#include <memory> +#include <iterator> +#include <vector> +#include <algorithm> +#include <ctime> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <tests/common/testhelper.h> +#include <tests/common/storagelinktest.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/dummystoragelink.h> +#include <vespa/storage/storageserver/mergethrottler.h> +#include <vespa/storage/persistence/messages.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/message/state.h> + +using namespace document; +using namespace storage::api; + +namespace storage { + +namespace { + +struct MergeBuilder +{ + document::BucketId _bucket; + api::Timestamp _maxTimestamp; + std::vector<uint16_t> _nodes; + std::vector<uint16_t> _chain; + uint64_t _clusterStateVersion; + + MergeBuilder(const document::BucketId& bucket) + : _bucket(bucket), + _maxTimestamp(1234), + _chain(), + _clusterStateVersion(1) + { + nodes(0, 1, 2); + } + + MergeBuilder& nodes(uint16_t n0) { + _nodes.push_back(n0); + return *this; + } + MergeBuilder& nodes(uint16_t n0, uint16_t n1) { + _nodes.push_back(n0); + _nodes.push_back(n1); + return *this; + } + MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) { + _nodes.push_back(n0); + _nodes.push_back(n1); + _nodes.push_back(n2); + return *this; + } + MergeBuilder& maxTimestamp(api::Timestamp maxTs) { + _maxTimestamp = maxTs; + return *this; + } + MergeBuilder& clusterStateVersion(uint64_t csv) { + _clusterStateVersion = csv; + return *this; + } + MergeBuilder& chain(uint16_t n0) { + _chain.clear(); + _chain.push_back(n0); + return *this; + } + MergeBuilder& chain(uint16_t n0, uint16_t n1) { + _chain.clear(); + _chain.push_back(n0); + _chain.push_back(n1); + return *this; + } + MergeBuilder& chain(uint16_t n0, uint16_t n1, uint16_t n2) { + _chain.clear(); + _chain.push_back(n0); + _chain.push_back(n1); + _chain.push_back(n2); + return *this; + } + + api::MergeBucketCommand::SP create() const { + std::vector<api::MergeBucketCommand::Node> n; + for (uint32_t i = 0; i < _nodes.size(); ++i) { + n.push_back(_nodes[i]); + } + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(_bucket, n, _maxTimestamp, + _clusterStateVersion, _chain)); + StorageMessageAddress address("storage", lib::NodeType::STORAGE, _nodes[0]); + cmd->setAddress(address); + return cmd; + } +}; + +std::shared_ptr<api::SetSystemStateCommand> +makeSystemStateCmd(const std::string& state) +{ + return std::make_shared<api::SetSystemStateCommand>( + lib::ClusterState(state)); +} + +} // anon ns + +class MergeThrottlerTest : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(MergeThrottlerTest); + CPPUNIT_TEST(testMergesConfig); + CPPUNIT_TEST(testChain); + CPPUNIT_TEST(testWithSourceOnlyNode); + CPPUNIT_TEST(test42DistributorBehavior); + CPPUNIT_TEST(test42DistributorBehaviorDoesNotTakeOwnership); + CPPUNIT_TEST(testEndOfChainExecutionDoesNotTakeOwnership); + CPPUNIT_TEST(testResendHandling); + CPPUNIT_TEST(testPriorityQueuing); + CPPUNIT_TEST(testCommandInQueueDuplicateOfKnownMerge); + CPPUNIT_TEST(testInvalidReceiverNode); + CPPUNIT_TEST(testForwardQueuedMerge); + CPPUNIT_TEST(testExecuteQueuedMerge); + CPPUNIT_TEST(testFlush); + CPPUNIT_TEST(testUnseenMergeWithNodeInChain); + CPPUNIT_TEST(testMergeWithNewerClusterStateFlushesOutdatedQueued); + CPPUNIT_TEST(testUpdatedClusterStateFlushesOutdatedQueued); + CPPUNIT_TEST(test42MergesDoNotTriggerFlush); + CPPUNIT_TEST(testOutdatedClusterStateMergesAreRejectedOnArrival); + CPPUNIT_TEST(testUnknownMergeWithSelfInChain); + CPPUNIT_TEST(testBusyReturnedOnFullQueue); + CPPUNIT_TEST(testBrokenCycle); + CPPUNIT_TEST(testGetBucketDiffCommandNotInActiveSetIsRejected); + CPPUNIT_TEST(testApplyBucketDiffCommandNotInActiveSetIsRejected); + CPPUNIT_TEST(testNewClusterStateAbortsAllOutdatedActiveMerges); + CPPUNIT_TEST_SUITE_END(); +public: + void setUp(); + void tearDown(); + + void testMergesConfig(); + void testChain(); + void testWithSourceOnlyNode(); + void test42DistributorBehavior(); + void test42DistributorBehaviorDoesNotTakeOwnership(); + void testEndOfChainExecutionDoesNotTakeOwnership(); + void testResendHandling(); + void testPriorityQueuing(); + void testCommandInQueueDuplicateOfKnownMerge(); + void testInvalidReceiverNode(); + void testForwardQueuedMerge(); + void testExecuteQueuedMerge(); + void testFlush(); + void testUnseenMergeWithNodeInChain(); + void testMergeWithNewerClusterStateFlushesOutdatedQueued(); + void testUpdatedClusterStateFlushesOutdatedQueued(); + void test42MergesDoNotTriggerFlush(); + void testOutdatedClusterStateMergesAreRejectedOnArrival(); + void testUnknownMergeWithSelfInChain(); + void testBusyReturnedOnFullQueue(); + void testBrokenCycle(); + void testGetBucketDiffCommandNotInActiveSetIsRejected(); + void testApplyBucketDiffCommandNotInActiveSetIsRejected(); + void testNewClusterStateAbortsAllOutdatedActiveMerges(); +private: + static const int _storageNodeCount = 3; + static const int _messageWaitTime = 100; + + // Using n storage node links and dummy servers + std::vector<std::shared_ptr<DummyStorageLink> > _topLinks; + std::vector<std::shared_ptr<TestServiceLayerApp> > _servers; + std::vector<MergeThrottler*> _throttlers; + std::vector<DummyStorageLink*> _bottomLinks; + + api::MergeBucketCommand::SP sendMerge(const MergeBuilder&); + + void sendAndExpectReply( + const std::shared_ptr<api::StorageMessage>& msg, + const api::MessageType& expectedReplyType, + api::ReturnCode::Result expectedResultCode); +}; + +const int MergeThrottlerTest::_storageNodeCount; +const int MergeThrottlerTest::_messageWaitTime; + +CPPUNIT_TEST_SUITE_REGISTRATION(MergeThrottlerTest); + +void +MergeThrottlerTest::setUp() +{ + vdstestlib::DirConfig config(getStandardConfig(true)); + + for (int i = 0; i < _storageNodeCount; ++i) { + std::unique_ptr<TestServiceLayerApp> server( + new TestServiceLayerApp(DiskCount(1), NodeIndex(i))); + server->setClusterState(lib::ClusterState( + "distributor:100 storage:100 version:1")); + std::unique_ptr<DummyStorageLink> top; + + top.reset(new DummyStorageLink); + MergeThrottler* throttler = new MergeThrottler(config.getConfigId(), server->getComponentRegister()); + // MergeThrottler will be sandwiched in between two dummy links + top->push_back(std::unique_ptr<StorageLink>(throttler)); + DummyStorageLink* bottom = new DummyStorageLink; + throttler->push_back(std::unique_ptr<StorageLink>(bottom)); + + _servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release())); + _throttlers.push_back(throttler); + _bottomLinks.push_back(bottom); + top->open(); + _topLinks.push_back(std::shared_ptr<DummyStorageLink>(top.release())); + } +} + +void +MergeThrottlerTest::tearDown() +{ + for (std::size_t i = 0; i < _topLinks.size(); ++i) { + if (_topLinks[i]->getState() == StorageLink::OPENED) { + _topLinks[i]->close(); + _topLinks[i]->flush(); + } + _topLinks[i] = std::shared_ptr<DummyStorageLink>(); + } + _topLinks.clear(); + _bottomLinks.clear(); + _throttlers.clear(); + _servers.clear(); +} + +namespace { + +template <typename Iterator> +bool +checkChain(const StorageMessage::SP& msg, + Iterator first, Iterator end) +{ + const MergeBucketCommand& cmd = + dynamic_cast<const MergeBucketCommand&>(*msg); + + if (cmd.getChain().size() != static_cast<std::size_t>(std::distance(first, end))) { + return false; + } + + return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first); +} + +void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout) +{ + std::time_t start = std::time(0); + while (true) { + std::size_t count; + { + vespalib::LockGuard lock(throttler.getStateLock()); + count = throttler.getMergeQueue().size(); + } + if (count == sz) { + break; + } + std::time_t now = std::time(0); + if (now - start > timeout) { + std::ostringstream os; + os << "Timeout while waiting for merge queue with " << sz << " items. Had " + << count << " at timeout."; + throw vespalib::IllegalStateException(os.str(), VESPA_STRLOC); + } + FastOS_Thread::Sleep(1); + } +} + +} + +// Extremely simple test that just checks that (min|max)_merges_per_node +// under the stor-server config gets propagated to all the nodes +void +MergeThrottlerTest::testMergesConfig() +{ + for (int i = 0; i < _storageNodeCount; ++i) { + CPPUNIT_ASSERT_EQUAL(uint32_t(25), _throttlers[i]->getThrottlePolicy().getMaxPendingCount()); + CPPUNIT_ASSERT_EQUAL(std::size_t(20), _throttlers[i]->getMaxQueueSize()); + } +} + +// Test that a distributor sending a merge to the lowest-index storage +// node correctly invokes a merge forwarding chain and subsequent unwind. +void +MergeThrottlerTest::testChain() +{ + uint16_t indices[_storageNodeCount]; + for (int i = 0; i < _storageNodeCount; ++i) { + indices[i] = i; + _servers[i]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:123")); + } + + BucketId bid(14, 0x1337); + + // Use different node permutations to ensure it works no matter which node is + // set as the executor. More specifically, _all_ permutations. + do { + uint16_t lastNodeIdx = _storageNodeCount - 1; + uint16_t executorNode = indices[0]; + + //std::cout << "\n----\n"; + std::vector<MergeBucketCommand::Node> nodes; + for (int i = 0; i < _storageNodeCount; ++i) { + nodes.push_back(MergeBucketCommand::Node(indices[i], (i + executorNode) % 2 == 0)); + //std::cout << indices[i] << " "; + } + //std::cout << "\n"; + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, UINT_MAX, 123)); + cmd->setPriority(7); + cmd->setTimeout(54321); + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + cmd->setAddress(address); + const uint16_t distributorIndex = 123; + cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded + + StorageMessage::SP fwd = cmd; + StorageMessage::SP fwdToExec; + + // TODO: make generic wrt. _storageNodeCount + + for (int i = 0; i < _storageNodeCount - 1; ++i) { + if (i == executorNode) { + fwdToExec = fwd; + } + CPPUNIT_ASSERT_EQUAL(uint16_t(i), _servers[i]->getIndex()); + // No matter the node order, command is always sent to node 0 -> 1 -> 2 etc + _topLinks[i]->sendDown(fwd); + _topLinks[i]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + //std::cout << "fwd " << i << " -> " << i+1 << "\n"; + + // Forwarded merge should not be sent down. Should not be necessary + // to lock throttler here, since it should be sleeping like a champion + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[i]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[i]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[i]->getActiveMerges().size()); + + fwd = _topLinks[i]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(i + 1), fwd->getAddress()->getIndex()); + CPPUNIT_ASSERT_EQUAL(distributorIndex, dynamic_cast<const StorageCommand&>(*fwd).getSourceIndex()); + { + //uint16_t chain[] = { 0 }; + std::vector<uint16_t> chain; + for (int j = 0; j <= i; ++j) { + chain.push_back(j); + } + CPPUNIT_ASSERT(checkChain(fwd, chain.begin(), chain.end())); + } + // Ensure priority, cluster state version and timeout is correctly forwarded + CPPUNIT_ASSERT_EQUAL(7, static_cast<int>(fwd->getPriority())); + CPPUNIT_ASSERT_EQUAL(uint32_t(123), dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(uint32_t(54321), dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + } + + _topLinks[lastNodeIdx]->sendDown(fwd); + + // If node 2 is the first in the node list, it should immediately execute + // the merge. Otherwise, a cycle with the first node should be formed. + if (executorNode != lastNodeIdx) { + //std::cout << "cycle " << lastNodeIdx << " -> " << executorNode << "\n"; + _topLinks[lastNodeIdx]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + // Forwarded merge should not be sent down + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[lastNodeIdx]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[lastNodeIdx]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[lastNodeIdx]->getActiveMerges().size()); + + fwd = _topLinks[lastNodeIdx]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(executorNode), fwd->getAddress()->getIndex()); + CPPUNIT_ASSERT_EQUAL(distributorIndex, dynamic_cast<const StorageCommand&>(*fwd).getSourceIndex()); + { + std::vector<uint16_t> chain; + for (int j = 0; j < _storageNodeCount; ++j) { + chain.push_back(j); + } + CPPUNIT_ASSERT(checkChain(fwd, chain.begin(), chain.end())); + } + CPPUNIT_ASSERT_EQUAL(7, static_cast<int>(fwd->getPriority())); + CPPUNIT_ASSERT_EQUAL(uint32_t(123), dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(uint32_t(54321), dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + + _topLinks[executorNode]->sendDown(fwd); + } + + _bottomLinks[executorNode]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + // Forwarded merge has now been sent down to persistence layer + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[executorNode]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[executorNode]->getNumReplies()); // No reply sent yet + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[executorNode]->getActiveMerges().size()); // no re-registering merge + + if (executorNode != lastNodeIdx) { + // The MergeBucketCommand that is kept in the executor node should + // be the one from the node it initially got it from, NOT the one + // from the last node, since the chain has looped + CPPUNIT_ASSERT(_throttlers[executorNode]->getActiveMerges().find(bid) + != _throttlers[executorNode]->getActiveMerges().end()); + CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()), + _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get()); + } + + // Send reply up from persistence layer to simulate a completed + // merge operation. Chain should now unwind properly + fwd = _bottomLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(7, static_cast<int>(fwd->getPriority())); + CPPUNIT_ASSERT_EQUAL(uint32_t(123), dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(uint32_t(54321), dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<")); + _bottomLinks[executorNode]->sendUp(reply); + + _topLinks[executorNode]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + if (executorNode != lastNodeIdx) { + // Merge should not be removed yet from executor, since it's pending an unwind + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[executorNode]->getActiveMerges().size()); + CPPUNIT_ASSERT_EQUAL(static_cast<StorageMessage*>(fwdToExec.get()), + _throttlers[executorNode]->getActiveMerges().find(bid)->second.getMergeCmd().get()); + } + // MergeBucketReply waiting to be sent back to node 2. NOTE: we don't have any + // transport context stuff set up here to perform the reply mapping, so we + // have to emulate it + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[executorNode]->getNumReplies()); + + StorageMessage::SP unwind = _topLinks[executorNode]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL(uint16_t(executorNode), unwind->getAddress()->getIndex()); + + // eg: 0 -> 2 -> 1 -> 0. Or: 2 -> 1 -> 0 if no cycle + for (int i = (executorNode != lastNodeIdx ? _storageNodeCount - 1 : _storageNodeCount - 2); i >= 0; --i) { + //std::cout << "unwind " << i << "\n"; + + _topLinks[i]->sendDown(unwind); + _topLinks[i]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[i]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[i]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[i]->getActiveMerges().size()); + + unwind = _topLinks[i]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL(uint16_t(i), unwind->getAddress()->getIndex()); + } + + const MergeBucketReply& mbr = dynamic_cast<const MergeBucketReply&>(*unwind); + + CPPUNIT_ASSERT_EQUAL(ReturnCode::OK, mbr.getResult().getResult()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("Great success! :D-|-<"), mbr.getResult().getMessage()); + CPPUNIT_ASSERT_EQUAL(bid, mbr.getBucketId()); + + } while (std::next_permutation(indices, indices + _storageNodeCount)); + + //std::cout << "\n" << *_topLinks[0] << "\n"; +} + +void +MergeThrottlerTest::testWithSourceOnlyNode() +{ + BucketId bid(14, 0x1337); + + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 0); + + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(2); + nodes.push_back(MergeBucketCommand::Node(1, true)); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, UINT_MAX, 123)); + + cmd->setAddress(address); + _topLinks[0]->sendDown(cmd); + + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(1), fwd->getAddress()->getIndex()); + + _topLinks[1]->sendDown(fwd); + + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(2), fwd->getAddress()->getIndex()); + + _topLinks[2]->sendDown(fwd); + + _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + fwd = _topLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(0), fwd->getAddress()->getIndex()); + + _topLinks[0]->sendDown(fwd); + _bottomLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + _bottomLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Great success! :D-|-<")); + _bottomLinks[0]->sendUp(reply); + + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL(uint16_t(0), fwd->getAddress()->getIndex()); + + // Assume everything's fine from here on out +} + +// 4.2 distributors don't guarantee they'll send to lowest node +// index, so we must detect such situations and execute the merge +// immediately rather than attempt to chain it. Test that this +// is done correctly. +void +MergeThrottlerTest::test42DistributorBehavior() +{ + BucketId bid(32, 0xfeef00); + + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, 1234)); + + // Send to node 1, which is not the lowest index + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1); + + cmd->setAddress(address); + _topLinks[1]->sendDown(cmd); + _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + // Should now have been sent to persistence layer + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[1]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[1]->getNumReplies()); // No reply sent yet + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[1]->getActiveMerges().size()); + + // Send reply up from persistence layer to simulate a completed + // merge operation. Merge should be removed from state. + _bottomLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cmd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Tonight we dine on turtle soup!")); + _bottomLinks[1]->sendUp(reply); + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[1]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[1]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[1]->getActiveMerges().size()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[1]->getMetrics().local.ok.getValue()); +} + +// Test that we don't take ownership of the merge command when we're +// just passing it through to the persistence layer when receiving +// a merge command that presumably comes form a 4.2 distributor +void +MergeThrottlerTest::test42DistributorBehaviorDoesNotTakeOwnership() +{ + BucketId bid(32, 0xfeef00); + + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, 1234)); + + // Send to node 1, which is not the lowest index + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1); + + cmd->setAddress(address); + _topLinks[1]->sendDown(cmd); + _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + // Should now have been sent to persistence layer + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[1]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[1]->getNumReplies()); // No reply sent yet + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[1]->getActiveMerges().size()); + + _bottomLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // To ensure we don't try to deref any non-owned messages + framework::HttpUrlPath path("?xml"); + std::ostringstream ss; + _throttlers[1]->reportStatus(ss, path); + + // Flush throttler (synchronously). Should NOT generate a reply + // for the merge command, as it is not owned by the throttler + StorageLinkTest::callOnFlush(*_throttlers[1], true); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[1]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[1]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[1]->getActiveMerges().size()); + + // Send a belated reply from persistence up just to ensure the + // throttler doesn't throw a fit if it receives an unknown merge + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cmd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Tonight we dine on turtle soup!")); + _bottomLinks[1]->sendUp(reply); + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[1]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[1]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[1]->getActiveMerges().size()); +} + +// Test that we don't take ownership of the merge command when we're +// just passing it through to the persistence layer when we're at the +// the end of the chain and also the designated executor +void +MergeThrottlerTest::testEndOfChainExecutionDoesNotTakeOwnership() +{ + BucketId bid(32, 0xfeef00); + + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(2); + nodes.push_back(1); + nodes.push_back(0); + std::vector<uint16_t> chain; + chain.push_back(0); + chain.push_back(1); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, 1234, 1, chain)); + + // Send to last node, which is not the lowest index + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); + + cmd->setAddress(address); + _topLinks[2]->sendDown(cmd); + _bottomLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + // Should now have been sent to persistence layer + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _bottomLinks[2]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[2]->getNumReplies()); // No reply sent yet + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _throttlers[2]->getActiveMerges().size()); + + _bottomLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // To ensure we don't try to deref any non-owned messages + framework::HttpUrlPath path(""); + std::ostringstream ss; + _throttlers[2]->reportStatus(ss, path); + + // Flush throttler (synchronously). Should NOT generate a reply + // for the merge command, as it is not owned by the throttler + StorageLinkTest::callOnFlush(*_throttlers[2], true); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[2]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[2]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[2]->getActiveMerges().size()); + + // Send a belated reply from persistence up just to ensure the + // throttler doesn't throw a fit if it receives an unknown merge + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cmd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Tonight we dine on turtle soup!")); + _bottomLinks[2]->sendUp(reply); + _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[2]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[2]->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _throttlers[2]->getActiveMerges().size()); +} + +// Test that nodes resending a merge command won't lead to duplicate +// state registration/forwarding or erasing the already present state +// information. +void +MergeThrottlerTest::testResendHandling() +{ + BucketId bid(32, 0xbadbed); + + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, 1234)); + + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1); + + cmd->setAddress(address); + _topLinks[0]->sendDown(cmd); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // Resend from "distributor". Just use same message, as that won't matter here + _topLinks[0]->sendDown(cmd); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + // Reply should be BUSY + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::BUSY); + + _topLinks[1]->sendDown(fwd); + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + _topLinks[2]->sendDown(fwd); + _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + _topLinks[2]->sendDown(fwd); + _topLinks[2]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + // Reply should be BUSY + reply = _topLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::BUSY); + + fwd = _topLinks[2]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + _topLinks[0]->sendDown(fwd); + _bottomLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + _topLinks[0]->sendDown(fwd); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::BUSY); +} + +void +MergeThrottlerTest::testPriorityQueuing() +{ + // Fill up all active merges + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + CPPUNIT_ASSERT(maxPending >= 4u); + for (std::size_t i = 0; i < maxPending; ++i) { + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234)); + cmd->setPriority(100); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 0 queued + _topLinks[0]->waitForMessages(maxPending, 5); + waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime); + + // Queue up some merges with different priorities + int priorities[4] = { 200, 150, 120, 240 }; + int sortedPris[4] = { 120, 150, 200, 240 }; + for (int i = 0; i < 4; ++i) { + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, i), nodes, 1234)); + cmd->setPriority(priorities[i]); + _topLinks[0]->sendDown(cmd); + } + + waitUntilMergeQueueIs(*_throttlers[0], 4, _messageWaitTime); + + // Remove all but 4 forwarded merges + for (std::size_t i = 0; i < maxPending - 4; ++i) { + _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + } + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(4), _topLinks[0]->getNumReplies()); + + // Now when we start replying to merges, queued merges should be + // processed in priority order + for (int i = 0; i < 4; ++i) { + StorageMessage::SP replyTo = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*replyTo))); + reply->setResult(ReturnCode(ReturnCode::OK, "whee")); + _topLinks[0]->sendDown(reply); + } + + _topLinks[0]->waitForMessages(8, _messageWaitTime); // 4 merges, 4 replies + waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime); + + for (int i = 0; i < 4; ++i) { + StorageMessage::SP cmd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint8_t(sortedPris[i]), cmd->getPriority()); + } +} + +// Test that we can detect and reject merges that due to resending +// and potential priority queue sneaking etc may end up with duplicates +// in the queue for a merge that is already known. +void +MergeThrottlerTest::testCommandInQueueDuplicateOfKnownMerge() +{ + // Fill up all active merges and 1 queued one + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + for (std::size_t i = 0; i < maxPending + 1; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(2 + i); + nodes.push_back(5 + i); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234)); + cmd->setPriority(100 - i); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 3 queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); + + // Add a merge for the same bucket twice to the queue + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(12); + nodes.push_back(123); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf000feee), nodes, 1234)); + _topLinks[0]->sendDown(cmd); + } + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(124); // Different node set doesn't matter + nodes.push_back(14); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf000feee), nodes, 1234)); + _topLinks[0]->sendDown(cmd); + } + + waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); + + StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // Remove and success-reply for 2 merges. This will give enough room + // for the 2 first queued merges to be processed, the last one having a + // duplicate in the queue. + for (int i = 0; i < 2; ++i) { + StorageMessage::SP fwd2 = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd2))); + reply->setResult(ReturnCode(ReturnCode::OK, "")); + _topLinks[0]->sendDown(reply); + } + + _topLinks[0]->waitForMessages(maxPending + 1, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); + + // Remove all current merge commands/replies so we can work with a clean slate + _topLinks[0]->getRepliesOnce(); + // Send a success-reply for fwd, allowing the duplicate from the queue + // to have its moment to shine only to then be struck down mercilessly + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd))); + reply->setResult(ReturnCode(ReturnCode::OK, "")); + _topLinks[0]->sendDown(reply); + + _topLinks[0]->waitForMessages(2, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime); + + // First reply is the successful merge reply + StorageMessage::SP reply2 = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply2).getResult().getResult(), + ReturnCode::OK); + + // Second reply should be the BUSY-rejected duplicate + StorageMessage::SP reply1 = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply1).getResult().getResult(), + ReturnCode::BUSY); + CPPUNIT_ASSERT(static_cast<MergeBucketReply&>(*reply1).getResult() + .getMessage().find("out of date;") != std::string::npos); +} + +// Test that sending a merge command to a node not in the set of +// to-be-merged nodes is handled gracefully. +// This is not a scenario that should ever actually happen, but for +// the sake of robustness, include it anyway. +void +MergeThrottlerTest::testInvalidReceiverNode() +{ + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(1); + nodes.push_back(5); + nodes.push_back(9); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baaaa), nodes, 1234)); + + // Send to node with index 0 + _topLinks[0]->sendDown(cmd); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::REJECTED); + CPPUNIT_ASSERT(static_cast<MergeBucketReply&>(*reply).getResult() + .getMessage().find("which is not in its forwarding chain") != std::string::npos); +} + +// Test that the throttling policy kicks in after a certain number of +// merges are forwarded and that the rest are queued in a prioritized +// order. +void +MergeThrottlerTest::testForwardQueuedMerge() +{ + // Fill up all active merges and then 3 queued ones + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + for (std::size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(2 + i); + nodes.push_back(5 + i); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234)); + cmd->setPriority(100 - i); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 3 queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); + + // Merge queue state should not be touched by worker thread now + StorageMessage::SP nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg; + + StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // Remove all the rest of the active merges + while (!_topLinks[0]->getReplies().empty()) { + _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + } + + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Celebrate good times come on")); + _topLinks[0]->sendDown(reply); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); // Success rewind reply + + // Remove reply bound for distributor + StorageMessage::SP distReply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*distReply).getResult().getResult(), + ReturnCode::OK); + + waitUntilMergeQueueIs(*_throttlers[0], 2, _messageWaitTime); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), _topLinks[0]->getNumReplies()); + + // First queued merge should now have been registered and forwarded + fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + CPPUNIT_ASSERT_EQUAL( + static_cast<const MergeBucketCommand&>(*fwd).getBucketId(), + static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + + CPPUNIT_ASSERT( + static_cast<const MergeBucketCommand&>(*fwd).getNodes() + == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes()); + + // Ensure forwarded merge has a higher priority than the next queued one + CPPUNIT_ASSERT(fwd->getPriority() < _throttlers[0]->getMergeQueue().begin()->_msg->getPriority()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue()); + + /*framework::HttpUrlPath path("?xml"); + _forwarders[0]->reportStatus(std::cerr, path);*/ +} + +void +MergeThrottlerTest::testExecuteQueuedMerge() +{ + MergeThrottler& throttler(*_throttlers[1]); + DummyStorageLink& topLink(*_topLinks[1]); + DummyStorageLink& bottomLink(*_bottomLinks[1]); + + // Fill up all active merges and then 3 queued ones + std::size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + for (std::size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(1); + nodes.push_back(5 + i); + nodes.push_back(7 + i); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1)); + cmd->setPriority(250 - i + 5); + topLink.sendDown(cmd); + } + + // Wait till we have maxPending replies and 3 queued + topLink.waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(throttler, 3, _messageWaitTime); + + // Sneak in a higher priority message that is bound to be executed + // on the given node + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(1); + nodes.push_back(0); + std::vector<uint16_t> chain; + chain.push_back(0); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0x1337), nodes, 1234, 1, chain)); + cmd->setPriority(0); + topLink.sendDown(cmd); + } + + waitUntilMergeQueueIs(throttler, 4, _messageWaitTime); + + // Merge queue state should not be touched by worker thread now + StorageMessage::SP nextMerge(throttler.getMergeQueue().begin()->_msg); + /*StorageMessage::SP nextMerge; + { + vespalib::LockGuard lock(_throttlers[0]->getStateLock()); + // Dirty: have to check internal state + nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg; + }*/ + + CPPUNIT_ASSERT_EQUAL( + BucketId(32, 0x1337), + dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + + StorageMessage::SP fwd(topLink.getAndRemoveMessage(MessageType::MERGEBUCKET)); + + // Remove all the rest of the active merges + while (!topLink.getReplies().empty()) { + topLink.getAndRemoveMessage(MessageType::MERGEBUCKET); + } + + // Free up a merge slot + std::shared_ptr<MergeBucketReply> reply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd))); + reply->setResult(ReturnCode(ReturnCode::OK, "Celebrate good times come on")); + topLink.sendDown(reply); + + topLink.waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + // Remove chain reply + StorageMessage::SP distReply(topLink.getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY)); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*distReply).getResult().getResult(), + ReturnCode::OK); + + waitUntilMergeQueueIs(throttler, 3, _messageWaitTime); + bottomLink.waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), topLink.getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), topLink.getNumReplies()); + CPPUNIT_ASSERT_EQUAL(std::size_t(1), bottomLink.getNumCommands()); + + // First queued merge should now have been registered and sent down + StorageMessage::SP cmd(bottomLink.getAndRemoveMessage(MessageType::MERGEBUCKET)); + + CPPUNIT_ASSERT_EQUAL( + static_cast<const MergeBucketCommand&>(*cmd).getBucketId(), + static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + + CPPUNIT_ASSERT( + static_cast<const MergeBucketCommand&>(*cmd).getNodes() + == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes()); +} + +void +MergeThrottlerTest::testFlush() +{ + // Fill up all active merges and then 3 queued ones + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + for (std::size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1)); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 3 queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); + + // Remove all forwarded commands + uint32_t removed = _topLinks[0]->getRepliesOnce().size(); + CPPUNIT_ASSERT(removed >= 5); + + // Flush the storage link, triggering an abort of all commands + // no matter what their current state is. + _topLinks[0]->close(); + _topLinks[0]->flush(); + _topLinks[0]->waitForMessages(maxPending + 3 - removed, _messageWaitTime); + + while (!_topLinks[0]->getReplies().empty()) { + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + ReturnCode::ABORTED, + static_cast<const MergeBucketReply&>(*reply).getResult().getResult()); + } + // NOTE: merges that have been immediately executed (i.e. not cycled) + // on the node should _not_ be replied to, since they're not owned + // by the throttler at that point in time +} + +// If a node goes down and another node has a merge chained through it in +// its queue, the original node can receive a final chain hop forwarding +// it knows nothing about when it comes back up. If this is not handled +// properly, it will attempt to forward this node again with a bogus +// index. This should be implicitly handled by checking for a full node +void +MergeThrottlerTest::testUnseenMergeWithNodeInChain() +{ + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(5); + nodes.push_back(9); + std::vector<uint16_t> chain; + chain.push_back(0); + chain.push_back(5); + chain.push_back(9); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xdeadbeef), nodes, 1234, 1, chain)); + + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 9); + + cmd->setAddress(address); + _topLinks[0]->sendDown(cmd); + + // First, test that we get rejected when processing merge immediately + // Should get a rejection in return + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + ReturnCode::REJECTED, + dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); + + // Second, test that we get rejected before queueing up. This is to + // avoid a hypothetical deadlock scenario. + // Fill up all active merges + { + + std::size_t maxPending( + _throttlers[0]->getThrottlePolicy().getMaxPendingCount()); + for (std::size_t i = 0; i < maxPending; ++i) { + std::shared_ptr<MergeBucketCommand> fillCmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234)); + _topLinks[0]->sendDown(fillCmd); + } + } + + _topLinks[0]->sendDown(cmd); + + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + ReturnCode::REJECTED, + dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); +} + +void +MergeThrottlerTest::testMergeWithNewerClusterStateFlushesOutdatedQueued() +{ + // Fill up all active merges and then 3 queued ones with the same + // system state + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + std::vector<api::StorageMessage::Id> ids; + for (std::size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1)); + ids.push_back(cmd->getMsgId()); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 3 queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); + + // Send down merge with newer system state + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0x12345678), nodes, 1234, 2)); + ids.push_back(cmd->getMsgId()); + _topLinks[0]->sendDown(cmd); + } + + // Queue should now be flushed with all messages being returned with + // WRONG_DISTRIBUTION + _topLinks[0]->waitForMessages(maxPending + 3, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); + + for (int i = 0; i < 3; ++i) { + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::WRONG_DISTRIBUTION); + CPPUNIT_ASSERT_EQUAL(1u, static_cast<MergeBucketReply&>(*reply).getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(ids[maxPending + i], reply->getMsgId()); + } + + CPPUNIT_ASSERT_EQUAL(uint64_t(3), _throttlers[0]->getMetrics().chaining.failures.wrongdistribution.getValue()); +} + +void +MergeThrottlerTest::testUpdatedClusterStateFlushesOutdatedQueued() +{ + // State is version 1. Send down several merges with state version 2. + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + std::vector<api::StorageMessage::Id> ids; + for (std::size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 2)); + ids.push_back(cmd->getMsgId()); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 4 queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); + + // Send down new system state (also set it explicitly) + _servers[0]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:3")); + std::shared_ptr<api::SetSystemStateCommand> stateCmd( + new api::SetSystemStateCommand(lib::ClusterState("distributor:100 storage:100 version:3"))); + _topLinks[0]->sendDown(stateCmd); + + // Queue should now be flushed with all being replied to with WRONG_DISTRIBUTION + waitUntilMergeQueueIs(*_throttlers[0], 0, _messageWaitTime); + _topLinks[0]->waitForMessages(maxPending + 3, 5); + + for (int i = 0; i < 3; ++i) { + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::WRONG_DISTRIBUTION); + CPPUNIT_ASSERT_EQUAL(2u, static_cast<MergeBucketReply&>(*reply).getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(ids[maxPending + i], reply->getMsgId()); + } + + CPPUNIT_ASSERT_EQUAL(uint64_t(3), _throttlers[0]->getMetrics().chaining.failures.wrongdistribution.getValue()); +} + +void +MergeThrottlerTest::test42MergesDoNotTriggerFlush() +{ + // Fill up all active merges and then 1 queued one + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + CPPUNIT_ASSERT(maxPending < 100); + for (std::size_t i = 0; i < maxPending + 1; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00baa00 + i), nodes, 1234, 1)); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and 1 queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); + + StorageMessage::SP fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // Remove all the rest of the active merges + while (!_topLinks[0]->getReplies().empty()) { + _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); + } + + // Send down a merge with a cluster state version of 0, which should + // be ignored and queued as usual + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xbaaadbed), nodes, 1234, 0)); + _topLinks[0]->sendDown(cmd); + } + + waitUntilMergeQueueIs(*_throttlers[0], 2, _messageWaitTime); + + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumReplies()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), _throttlers[0]->getMetrics().local.failures.wrongdistribution.getValue()); +} + +// Test that a merge that arrive with a state version that is less than +// that of the node is rejected immediately +void +MergeThrottlerTest::testOutdatedClusterStateMergesAreRejectedOnArrival() +{ + _servers[0]->setClusterState(lib::ClusterState("distributor:100 storage:100 version:10")); + + // Send down a merge with a cluster state version of 9, which should + // be rejected + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 9)); + _topLinks[0]->sendDown(cmd); + } + + _topLinks[0]->waitForMessages(1, _messageWaitTime); + + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL( + static_cast<MergeBucketReply&>(*reply).getResult().getResult(), + ReturnCode::WRONG_DISTRIBUTION); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), _throttlers[0]->getMetrics().chaining.failures.wrongdistribution.getValue()); +} + +// Test erroneous case where node receives merge where the merge does +// not exist in the state, but it exists in the chain without the chain +// being full. This is something that shouldn't happen, but must still +// not crash the node +void +MergeThrottlerTest::testUnknownMergeWithSelfInChain() +{ + BucketId bid(32, 0xbadbed); + + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::vector<uint16_t> chain; + chain.push_back(0); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(bid, nodes, 1234, 1, chain)); + + StorageMessageAddress address("storage", lib::NodeType::STORAGE, 1); + + cmd->setAddress(address); + _topLinks[0]->sendDown(cmd); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + + CPPUNIT_ASSERT_EQUAL( + ReturnCode::REJECTED, + static_cast<MergeBucketReply&>(*reply).getResult().getResult()); +} + +void +MergeThrottlerTest::testBusyReturnedOnFullQueue() +{ + std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + std::size_t maxQueue = _throttlers[0]->getMaxQueueSize(); + CPPUNIT_ASSERT(maxPending < 100); + for (std::size_t i = 0; i < maxPending + maxQueue; ++i) { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf00000 + i), nodes, 1234, 1)); + _topLinks[0]->sendDown(cmd); + } + + // Wait till we have maxPending replies and maxQueue queued + _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[0], maxQueue, _messageWaitTime); + + // Clear all forwarded merges + _topLinks[0]->getRepliesOnce(); + // Send down another merge which should be immediately busy-returned + { + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(0); + nodes.push_back(1); + nodes.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xf000baaa), nodes, 1234, 1)); + _topLinks[0]->sendDown(cmd); + } + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + StorageMessage::SP reply = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + + CPPUNIT_ASSERT_EQUAL( + BucketId(32, 0xf000baaa), + static_cast<MergeBucketReply&>(*reply).getBucketId()); + + CPPUNIT_ASSERT_EQUAL( + ReturnCode::BUSY, + static_cast<MergeBucketReply&>(*reply).getResult().getResult()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), + _throttlers[0]->getMetrics().chaining + .failures.busy.getValue()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1), + _throttlers[0]->getMetrics().local + .failures.busy.getValue()); +} + +void +MergeThrottlerTest::testBrokenCycle() +{ + std::vector<MergeBucketCommand::Node> nodes; + nodes.push_back(1); + nodes.push_back(0); + nodes.push_back(2); + { + std::vector<uint16_t> chain; + chain.push_back(0); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 1, chain)); + _topLinks[1]->sendDown(cmd); + } + + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + StorageMessage::SP fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(2), fwd->getAddress()->getIndex()); + + // Send cycled merge which will be executed + { + std::vector<uint16_t> chain; + chain.push_back(0); + chain.push_back(1); + chain.push_back(2); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 1, chain)); + _topLinks[1]->sendDown(cmd); + } + + _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + StorageMessage::SP cycled = _bottomLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + + // Now, node 2 goes down, auto sending back a failed merge + std::shared_ptr<MergeBucketReply> nodeDownReply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*fwd))); + nodeDownReply->setResult(ReturnCode(ReturnCode::NOT_CONNECTED, "Node went sightseeing")); + + _topLinks[1]->sendDown(nodeDownReply); + // Merge reply also arrives from persistence + std::shared_ptr<MergeBucketReply> persistenceReply( + new MergeBucketReply(dynamic_cast<const MergeBucketCommand&>(*cycled))); + persistenceReply->setResult(ReturnCode(ReturnCode::ABORTED, "Oh dear")); + _bottomLinks[1]->sendUp(persistenceReply); + + // Should now be two replies from node 1, one to node 2 and one to node 0 + // since we must handle broken chains + _topLinks[1]->waitForMessages(2, _messageWaitTime); + // Unwind reply shares the result of the persistence reply + for (int i = 0; i < 2; ++i) { + StorageMessage::SP reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(ReturnCode::ABORTED, "Oh dear"), + static_cast<MergeBucketReply&>(*reply).getResult()); + } + + // Make sure it has been removed from the internal state so we can + // send new merges for the bucket + { + std::vector<uint16_t> chain; + chain.push_back(0); + std::shared_ptr<MergeBucketCommand> cmd( + new MergeBucketCommand(BucketId(32, 0xfeef00), nodes, 1234, 1, chain)); + _topLinks[1]->sendDown(cmd); + } + + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, 5); + fwd = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET); + CPPUNIT_ASSERT_EQUAL(uint16_t(2), fwd->getAddress()->getIndex()); +} + +void +MergeThrottlerTest::sendAndExpectReply( + const std::shared_ptr<api::StorageMessage>& msg, + const api::MessageType& expectedReplyType, + api::ReturnCode::Result expectedResultCode) +{ + _topLinks[0]->sendDown(msg); + _topLinks[0]->waitForMessage(expectedReplyType, _messageWaitTime); + StorageMessage::SP reply(_topLinks[0]->getAndRemoveMessage( + expectedReplyType)); + api::StorageReply& storageReply( + dynamic_cast<api::StorageReply&>(*reply)); + CPPUNIT_ASSERT_EQUAL(expectedResultCode, + storageReply.getResult().getResult()); +} + +void +MergeThrottlerTest::testGetBucketDiffCommandNotInActiveSetIsRejected() +{ + document::BucketId bucket(16, 1234); + std::vector<api::GetBucketDiffCommand::Node> nodes; + std::shared_ptr<api::GetBucketDiffCommand> getDiffCmd( + new api::GetBucketDiffCommand(bucket, nodes, api::Timestamp(1234))); + + sendAndExpectReply(getDiffCmd, + api::MessageType::GETBUCKETDIFF_REPLY, + api::ReturnCode::ABORTED); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[0]->getNumCommands()); +} + +void +MergeThrottlerTest::testApplyBucketDiffCommandNotInActiveSetIsRejected() +{ + document::BucketId bucket(16, 1234); + std::vector<api::GetBucketDiffCommand::Node> nodes; + std::shared_ptr<api::ApplyBucketDiffCommand> applyDiffCmd( + new api::ApplyBucketDiffCommand(bucket, nodes, api::Timestamp(1234))); + + sendAndExpectReply(applyDiffCmd, + api::MessageType::APPLYBUCKETDIFF_REPLY, + api::ReturnCode::ABORTED); + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _bottomLinks[0]->getNumCommands()); +} + +api::MergeBucketCommand::SP +MergeThrottlerTest::sendMerge(const MergeBuilder& builder) +{ + api::MergeBucketCommand::SP cmd(builder.create()); + _topLinks[builder._nodes[0]]->sendDown(cmd); + return cmd; +} + +void +MergeThrottlerTest::testNewClusterStateAbortsAllOutdatedActiveMerges() +{ + document::BucketId bucket(16, 6789); + _throttlers[0]->getThrottlePolicy().setMaxPendingCount(1); + + // Merge will be forwarded (i.e. active). + sendMerge(MergeBuilder(bucket).clusterStateVersion(10)); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + StorageMessage::SP fwd(_topLinks[0]->getAndRemoveMessage( + MessageType::MERGEBUCKET)); + + _topLinks[0]->sendDown(makeSystemStateCmd( + "version:11 distributor:100 storage:100")); + // Cannot send reply until we're unwinding + CPPUNIT_ASSERT_EQUAL(std::size_t(0), _topLinks[0]->getNumReplies()); + + // Trying to diff the bucket should now fail + { + std::shared_ptr<api::GetBucketDiffCommand> getDiffCmd( + new api::GetBucketDiffCommand(bucket, {}, api::Timestamp(123))); + + sendAndExpectReply(getDiffCmd, + api::MessageType::GETBUCKETDIFF_REPLY, + api::ReturnCode::ABORTED); + } +} + +// TODO test message queue aborting (use rendezvous functionality--make guard) + +} // namespace storage diff --git a/storage/src/tests/storageserver/priorityconvertertest.cpp b/storage/src/tests/storageserver/priorityconvertertest.cpp new file mode 100644 index 00000000000..ecbdcfb6b91 --- /dev/null +++ b/storage/src/tests/storageserver/priorityconvertertest.cpp @@ -0,0 +1,104 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/storage/storageserver/priorityconverter.h> +#include <tests/common/testhelper.h> + +namespace storage { + +struct PriorityConverterTest : public CppUnit::TestFixture +{ + std::unique_ptr<PriorityConverter> _converter; + + void setUp() { + vdstestlib::DirConfig config(getStandardConfig(true)); + _converter.reset(new PriorityConverter(config.getConfigId())); + }; + + void testNormalUsage(); + void testLowestPriorityIsReturnedForUnknownCode(); + + CPPUNIT_TEST_SUITE(PriorityConverterTest); + CPPUNIT_TEST(testNormalUsage); + CPPUNIT_TEST(testLowestPriorityIsReturnedForUnknownCode); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(PriorityConverterTest); + +void PriorityConverterTest::testNormalUsage() +{ + for (int p=0; p<16; ++p) { + CPPUNIT_ASSERT_EQUAL( + (uint8_t)(50+p*10), + _converter->toStoragePriority( + static_cast<documentapi::Priority::Value>(p))); + } + for (int i=0; i<256; ++i) { + uint8_t p = i; + if (p <= 50) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGHEST, + _converter->toDocumentPriority(p)); + } else if (p <= 60) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_VERY_HIGH, + _converter->toDocumentPriority(p)); + } else if (p <= 70) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGH_1, + _converter->toDocumentPriority(p)); + } else if (p <= 80) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGH_2, + _converter->toDocumentPriority(p)); + } else if (p <= 90) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_HIGH_3, + _converter->toDocumentPriority(p)); + } else if (p <= 100) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_1, + _converter->toDocumentPriority(p)); + } else if (p <= 110) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_2, + _converter->toDocumentPriority(p)); + } else if (p <= 120) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_3, + _converter->toDocumentPriority(p)); + } else if (p <= 130) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_4, + _converter->toDocumentPriority(p)); + } else if (p <= 140) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_5, + _converter->toDocumentPriority(p)); + } else if (p <= 150) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_NORMAL_6, + _converter->toDocumentPriority(p)); + } else if (p <= 160) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOW_1, + _converter->toDocumentPriority(p)); + } else if (p <= 170) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOW_2, + _converter->toDocumentPriority(p)); + } else if (p <= 180) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOW_3, + _converter->toDocumentPriority(p)); + } else if (p <= 190) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_VERY_LOW, + _converter->toDocumentPriority(p)); + } else if (p <= 200) { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOWEST, + _converter->toDocumentPriority(p)); + } else { + CPPUNIT_ASSERT_EQUAL(documentapi::Priority::PRI_LOWEST, + _converter->toDocumentPriority(p)); + } + } +} + + +void +PriorityConverterTest::testLowestPriorityIsReturnedForUnknownCode() +{ + CPPUNIT_ASSERT_EQUAL(255, + static_cast<int>(_converter->toStoragePriority( + static_cast<documentapi::Priority::Value>(123)))); +} + +} diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp new file mode 100644 index 00000000000..68a35ac37d9 --- /dev/null +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -0,0 +1,264 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <boost/pointer_cast.hpp> +#include <cppunit/extensions/HelperMacros.h> +#include <iostream> +#include <vespa/metrics/metricmanager.h> +#include <string> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/vdslib/state/nodestate.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <vespa/storage/storageserver/statemanager.h> +#include <vespa/storage/common/hostreporter/hostinfo.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> +#include <tests/common/dummystoragelink.h> +#include <vespa/vespalib/data/slime/type.h> + +using storage::lib::NodeState; +using storage::lib::NodeType; +using storage::lib::State; +using storage::lib::ClusterState; + +namespace storage { + +struct StateManagerTest : public CppUnit::TestFixture { + std::unique_ptr<TestServiceLayerApp> _node; + std::unique_ptr<DummyStorageLink> _upper; + std::unique_ptr<metrics::MetricManager> _metricManager; + StateManager* _manager; + DummyStorageLink* _lower; + + StateManagerTest(); + + void setUp(); + void tearDown(); + + void testSystemState(); + void testReportedNodeState(); + void testClusterStateVersion(); + + CPPUNIT_TEST_SUITE(StateManagerTest); + CPPUNIT_TEST(testSystemState); + CPPUNIT_TEST(testReportedNodeState); + CPPUNIT_TEST(testClusterStateVersion); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(StateManagerTest); + +StateManagerTest::StateManagerTest() + : _node(), + _upper(), + _manager(0), + _lower(0) +{ +} + +void +StateManagerTest::setUp() { + try{ + vdstestlib::DirConfig config(getStandardConfig(true)); + _node.reset(new TestServiceLayerApp(DiskCount(1), NodeIndex(2))); + // Clock will increase 1 sec per call. + _node->getClock().setAbsoluteTimeInSeconds(1); + _metricManager.reset(new metrics::MetricManager); + _upper.reset(new DummyStorageLink()); + _manager = new StateManager(_node->getComponentRegister(), + *_metricManager, + std::unique_ptr<HostInfo>(new HostInfo)); + _lower = new DummyStorageLink(); + _upper->push_back(StorageLink::UP(_manager)); + _upper->push_back(StorageLink::UP(_lower)); + _upper->open(); + } catch (std::exception& e) { + std::cerr << "Failed to static initialize objects: " << e.what() + << "\n"; + } +} + +void +StateManagerTest::tearDown() { + CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _lower->getNumCommands()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumCommands()); + _manager = 0; + _lower = 0; + _upper->close(); + _upper->flush(); + _upper.reset(0); + _node.reset(0); + _metricManager.reset(); +} + +#define GET_ONLY_OK_REPLY(varname) \ +{ \ + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \ + CPPUNIT_ASSERT(_upper->getReply(0)->getType().isReply()); \ + varname = std::dynamic_pointer_cast<api::StorageReply>( \ + _upper->getReply(0)); \ + CPPUNIT_ASSERT(varname != 0); \ + _upper->reset(); \ + CPPUNIT_ASSERT_EQUAL(api::ReturnCode(api::ReturnCode::OK), \ + varname->getResult()); \ +} + +void +StateManagerTest::testSystemState() +{ + std::shared_ptr<api::StorageReply> reply; + // Verify initial state on startup + ClusterState::CSP currentState = _manager->getSystemState(); + CPPUNIT_ASSERT_EQUAL(std::string("cluster:d"), + currentState->toString(false)); + + NodeState::CSP currentNodeState = _manager->getCurrentNodeState(); + CPPUNIT_ASSERT_EQUAL(std::string("s:d"), currentNodeState->toString(false)); + + ClusterState sendState("storage:4 .2.s:m"); + std::shared_ptr<api::SetSystemStateCommand> cmd( + new api::SetSystemStateCommand(sendState)); + _upper->sendDown(cmd); + GET_ONLY_OK_REPLY(reply); + + currentState = _manager->getSystemState(); + CPPUNIT_ASSERT_EQUAL(sendState, *currentState); + + currentNodeState = _manager->getCurrentNodeState(); + CPPUNIT_ASSERT_EQUAL(std::string("s:m"), currentNodeState->toString(false)); +} + +namespace { + struct MyStateListener : public StateListener { + const NodeStateUpdater& updater; + lib::NodeState current; + std::ostringstream ost; + + MyStateListener(const NodeStateUpdater& upd) + : updater(upd), current(*updater.getReportedNodeState()) {} + + void handleNewState() + { + ost << current << " -> "; + current = *updater.getReportedNodeState(); + ost << current << "\n"; + } + }; +} + +void +StateManagerTest::testReportedNodeState() +{ + std::shared_ptr<api::StorageReply> reply; + // Add a state listener to check that we get events. + MyStateListener stateListener(*_manager); + _manager->addStateListener(stateListener); + // Test that initial state is initializing + NodeState::CSP nodeState = _manager->getReportedNodeState(); + CPPUNIT_ASSERT_EQUAL(std::string("s:i b:58 i:0 t:1"), nodeState->toString(false)); + // Test that it works to update the state + { + NodeStateUpdater::Lock::SP lock(_manager->grabStateChangeLock()); + NodeState ns(*_manager->getReportedNodeState()); + ns.setState(State::UP); + _manager->setReportedNodeState(ns); + } + // And that we get the change both through state interface + nodeState = _manager->getReportedNodeState(); + CPPUNIT_ASSERT_EQUAL(std::string("s:u b:58 t:1"), + nodeState->toString(false)); + // And get node state command (no expected state) + std::shared_ptr<api::GetNodeStateCommand> cmd( + new api::GetNodeStateCommand(lib::NodeState::UP())); + _upper->sendDown(cmd); + GET_ONLY_OK_REPLY(reply); + CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY, + reply->getType()); + nodeState.reset(new NodeState( + dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState())); + CPPUNIT_ASSERT_EQUAL(std::string("s:u b:58 t:1"), + nodeState->toString(false)); + // We should also get it with wrong expected state + cmd.reset(new api::GetNodeStateCommand(lib::NodeState::UP(new NodeState(NodeType::STORAGE, State::INITIALIZING)))); + _upper->sendDown(cmd); + GET_ONLY_OK_REPLY(reply); + CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY, + reply->getType()); + nodeState.reset(new NodeState( + dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState())); + CPPUNIT_ASSERT_EQUAL(std::string("s:u b:58 t:1"), + nodeState->toString(false)); + // With correct wanted state we should not get response right away + cmd.reset(new api::GetNodeStateCommand( + lib::NodeState::UP(new NodeState("s:u b:58 t:1", &NodeType::STORAGE)))); + _upper->sendDown(cmd); + CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); + // But when we update state, we get the reply + { + NodeStateUpdater::Lock::SP lock(_manager->grabStateChangeLock()); + NodeState ns(*_manager->getReportedNodeState()); + ns.setState(State::STOPPING); + ns.setDescription("Stopping node"); + _manager->setReportedNodeState(ns); + } + + GET_ONLY_OK_REPLY(reply); + CPPUNIT_ASSERT_EQUAL(api::MessageType::GETNODESTATE_REPLY, + reply->getType()); + nodeState.reset(new NodeState( + dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState())); + CPPUNIT_ASSERT_EQUAL(std::string("s:s b:58 t:1 m:Stopping\\x20node"), + nodeState->toString(false)); + + // Removing state listener, it stops getting updates + _manager->removeStateListener(stateListener); + // Do another update which listener should not get.. + { + NodeStateUpdater::Lock::SP lock(_manager->grabStateChangeLock()); + NodeState ns(*_manager->getReportedNodeState()); + ns.setState(State::UP); + _manager->setReportedNodeState(ns); + } + std::string expectedEvents = + "s:i b:58 i:0 t:1 -> s:u b:58 t:1\n" + "s:u b:58 t:1 -> s:s b:58 t:1 m:Stopping\\x20node\n"; + CPPUNIT_ASSERT_EQUAL(expectedEvents, stateListener.ost.str()); +} + +void +StateManagerTest::testClusterStateVersion() +{ + ClusterState state(*_manager->getSystemState()); + state.setVersion(123); + _manager->setClusterState(state); + + std::string nodeInfoString(_manager->getNodeInfo()); + vespalib::slime::Memory goldenMemory(nodeInfoString); + vespalib::Slime nodeInfo; + vespalib::slime::JsonFormat::decode(nodeInfoString, nodeInfo); + + vespalib::slime::Symbol lookupSymbol = + nodeInfo.lookup("cluster-state-version"); + if (lookupSymbol.undefined()) { + CPPUNIT_FAIL("No cluster-state-version was found in the node info"); + } + + auto& cursor = nodeInfo.get(); + auto& clusterStateVersionCursor = cursor["cluster-state-version"]; + if (!clusterStateVersionCursor.valid()) { + CPPUNIT_FAIL("No cluster-state-version was found in the node info"); + } + + if (clusterStateVersionCursor.type().getId() != vespalib::slime::LONG::ID) { + CPPUNIT_FAIL("No cluster-state-version was found in the node info"); + } + + int version = clusterStateVersionCursor.asLong(); + CPPUNIT_ASSERT_EQUAL(123, version); +} + +} // storage + diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp new file mode 100644 index 00000000000..ef1592bce80 --- /dev/null +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -0,0 +1,279 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <cppunit/extensions/HelperMacros.h> +#include <vespa/log/log.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/storageserver/applicationgenerationfetcher.h> +#include <vespa/storage/storageserver/statereporter.h> +#include <tests/common/teststorageapp.h> +#include <tests/common/testhelper.h> +#include <tests/common/dummystoragelink.h> + +LOG_SETUP(".test.statereporter"); + +namespace storage { + +class DummyApplicationGenerationFether : public ApplicationGenerationFetcher { +public: + virtual int64_t getGeneration() const { return 1; } + virtual std::string getComponentName() const { return "component"; } +}; + +struct StateReporterTest : public CppUnit::TestFixture { + FastOS_ThreadPool _threadPool; + framework::defaultimplementation::FakeClock* _clock; + std::unique_ptr<TestServiceLayerApp> _node; + std::unique_ptr<DummyStorageLink> _top; + DummyApplicationGenerationFether _generationFetcher; + std::unique_ptr<StateReporter> _stateReporter; + std::unique_ptr<vdstestlib::DirConfig> _config; + std::unique_ptr<metrics::MetricSet> _topSet; + std::unique_ptr<metrics::MetricManager> _metricManager; + std::shared_ptr<FileStorMetrics> _filestorMetrics; + + StateReporterTest(); + + void setUp(); + void tearDown(); + void runLoad(uint32_t count = 1); + + void testReportConfigGeneration(); + void testReportHealth(); + void testReportMetrics(); + + CPPUNIT_TEST_SUITE(StateReporterTest); + CPPUNIT_TEST(testReportConfigGeneration); + CPPUNIT_TEST(testReportHealth); + CPPUNIT_TEST(testReportMetrics); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(StateReporterTest); + +namespace { + struct MetricClock : public metrics::MetricManager::Timer + { + framework::Clock& _clock; + MetricClock(framework::Clock& c) : _clock(c) {} + virtual time_t getTime() const + { return _clock.getTimeInSeconds().getTime(); } + virtual time_t getTimeInMilliSecs() const + { return _clock.getTimeInMillis().getTime(); } + }; +} + +StateReporterTest::StateReporterTest() + : _threadPool(256*1024), + _clock(0), + _top(), + _stateReporter() +{ +} + +void StateReporterTest::setUp() { + assert(system("rm -rf vdsroot") == 0); + _config.reset(new vdstestlib::DirConfig(getStandardConfig(true))); + try { + _node.reset(new TestServiceLayerApp(DiskCount(4), NodeIndex(0), + _config->getConfigId())); + _node->setupDummyPersistence(); + _clock = &_node->getClock(); + _clock->setAbsoluteTimeInSeconds(1000000); + _top.reset(new DummyStorageLink); + } catch (config::InvalidConfigException& e) { + fprintf(stderr, "%s\n", e.what()); + } + _metricManager.reset(new metrics::MetricManager( + std::unique_ptr<metrics::MetricManager::Timer>( + new MetricClock(*_clock)))); + _topSet.reset(new metrics::MetricSet("vds", "", "")); + { + metrics::MetricLockGuard guard(_metricManager->getMetricLock()); + _metricManager->registerMetric(guard, *_topSet); + } + + _stateReporter.reset(new StateReporter( + _node->getComponentRegister(), + *_metricManager, + _generationFetcher, + "status")); + + uint16_t diskCount = _node->getPartitions().size(); + documentapi::LoadTypeSet::SP loadTypes(_node->getLoadTypes()); + + _filestorMetrics.reset(new FileStorMetrics( + _node->getLoadTypes()->getMetricLoadTypes())); + _filestorMetrics->initDiskMetrics( + diskCount, loadTypes->getMetricLoadTypes(), 1); + _topSet->registerMetric(*_filestorMetrics); + + _metricManager->init(_config->getConfigId(), _node->getThreadPool()); +} + +void StateReporterTest::tearDown() { + _metricManager->stop(); + _stateReporter.reset(0); + _topSet.reset(0); + _metricManager.reset(0); + _top.reset(0); + _node.reset(0); + _config.reset(0); + _filestorMetrics.reset(); +} + +#define PARSE_JSON(jsonData) \ +vespalib::Slime slime; \ +{ \ + using namespace vespalib::slime; \ + size_t parsed = JsonFormat::decode(Memory(jsonData), slime); \ + SimpleBuffer buffer; \ + JsonFormat::encode(slime, buffer, false); \ + if (jsonData.size() != parsed) { \ + std::ostringstream error; \ + error << "Failed to parse JSON: '\n" \ + << jsonData << "'\n:" << buffer.get().make_string() << "\n"; \ + CPPUNIT_ASSERT_EQUAL_MSG(error.str(), jsonData.size(), parsed); \ + } \ +} + +#define ASSERT_GENERATION(jsonData, component, generation) \ +{ \ + PARSE_JSON(jsonData); \ + CPPUNIT_ASSERT_EQUAL( \ + generation, \ + slime.get()["config"][component]["generation"].asDouble()); \ +} + +#define ASSERT_NODE_STATUS(jsonData, code, message) \ +{ \ + PARSE_JSON(jsonData); \ + CPPUNIT_ASSERT_EQUAL( \ + vespalib::string(code), \ + slime.get()["status"]["code"].asString().make_string()); \ + CPPUNIT_ASSERT_EQUAL( \ + vespalib::string(message), \ + slime.get()["status"]["message"].asString().make_string()); \ +} + +#define ASSERT_METRIC_GET_PUT(jsonData, expGetCount, expPutCount) \ +{ \ + PARSE_JSON(jsonData); \ + double getCount = -1; \ + double putCount = -1; \ + size_t metricCount = slime.get()["metrics"]["values"].children(); \ + /*std::cerr << "\nmetric count=" << metricCount << "\n";*/ \ + for (size_t j=0; j<metricCount; j++) { \ + const vespalib::string name = slime.get()["metrics"]["values"][j]["name"] \ + .asString().make_string(); \ + if (name.compare("vds.filestor.alldisks.allthreads." \ + "get.sum.count") == 0) \ + { \ + getCount = slime.get()["metrics"]["values"][j]["values"]["count"] \ + .asDouble(); \ + } else if (name.compare("vds.filestor.alldisks.allthreads." \ + "put.sum.count") == 0) \ + { \ + putCount = slime.get()["metrics"]["values"][j]["values"]["count"] \ + .asDouble(); \ + } \ + } \ + CPPUNIT_ASSERT_EQUAL(expGetCount, getCount); \ + CPPUNIT_ASSERT_EQUAL(expPutCount, putCount); \ + CPPUNIT_ASSERT(metricCount > 100); \ +} + + +void StateReporterTest::testReportConfigGeneration() { + std::ostringstream ost; + framework::HttpUrlPath path("/state/v1/config"); + _stateReporter->reportStatus(ost, path); + std::string jsonData = ost.str(); + //std::cerr << "\nConfig: " << jsonData << "\n"; + ASSERT_GENERATION(jsonData, "component", 1.0); +} + +void StateReporterTest::testReportHealth() { + const int stateCount = 7; + const lib::NodeState nodeStates[stateCount] = { + lib::NodeState(lib::NodeType::STORAGE, lib::State::UNKNOWN), + lib::NodeState(lib::NodeType::STORAGE, lib::State::MAINTENANCE), + lib::NodeState(lib::NodeType::STORAGE, lib::State::DOWN), + lib::NodeState(lib::NodeType::STORAGE, lib::State::STOPPING), + lib::NodeState(lib::NodeType::STORAGE, lib::State::INITIALIZING), + lib::NodeState(lib::NodeType::STORAGE, lib::State::RETIRED), + lib::NodeState(lib::NodeType::STORAGE, lib::State::UP) + }; + const char* codes[stateCount] = { + "down", + "down", + "down", + "down", + "down", + "down", + "up" + }; + const char* messages[stateCount] = { + "Node state: Unknown", + "Node state: Maintenance", + "Node state: Down", + "Node state: Stopping", + "Node state: Initializing, init progress 0", + "Node state: Retired", + "" + }; + + framework::HttpUrlPath path("/state/v1/health"); + for (int i=0; i<stateCount; i++) { + _node->getStateUpdater().setCurrentNodeState(nodeStates[i]); + std::ostringstream ost; + _stateReporter->reportStatus(ost, path); + std::string jsonData = ost.str(); + //std::cerr << "\nHealth " << i << ":" << jsonData << "\n"; + ASSERT_NODE_STATUS(jsonData, codes[i], messages[i]); + } +} + +void StateReporterTest::testReportMetrics() { + FileStorDiskMetrics& disk0(*_filestorMetrics->disks[0]); + FileStorThreadMetrics& thread0(*disk0.threads[0]); + + LOG(info, "Adding to get metric"); + + using documentapi::LoadType; + thread0.get[LoadType::DEFAULT].count.inc(1); + + LOG(info, "Waiting for 5 minute snapshot to be taken"); + // Wait until active metrics have been added to 5 min snapshot and reset + for (uint32_t i=0; i<6; ++i) { + _clock->addSecondsToTime(60); + _metricManager->timeChangedNotification(); + while ( + uint64_t(_metricManager->getLastProcessedTime()) + < _clock->getTimeInSeconds().getTime()) + { + FastOS_Thread::Sleep(1); + } + } + LOG(info, "5 minute snapshot should have been taken. Adding put count"); + + thread0.put[LoadType::DEFAULT].count.inc(1); + + const int pathCount = 2; + const char* paths[pathCount] = { + "/state/v1/metrics", + "/state/v1/metrics?consumer=status" + }; + + for (int i=0; i<pathCount; i++) { + framework::HttpUrlPath path(paths[i]); + std::ostringstream ost; + _stateReporter->reportStatus(ost, path); + std::string jsonData = ost.str(); + //std::cerr << "\nMetrics:" << jsonData << "\n"; + ASSERT_METRIC_GET_PUT(jsonData, 1.0, 0.0); + } + } + +} // storage diff --git a/storage/src/tests/storageserver/testvisitormessagesession.cpp b/storage/src/tests/storageserver/testvisitormessagesession.cpp new file mode 100644 index 00000000000..e814f6cf229 --- /dev/null +++ b/storage/src/tests/storageserver/testvisitormessagesession.cpp @@ -0,0 +1,78 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <tests/storageserver/testvisitormessagesession.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> + +namespace storage { + +TestVisitorMessageSession::~TestVisitorMessageSession() +{ +} + +TestVisitorMessageSession::TestVisitorMessageSession(VisitorThread& t, + Visitor& v, + const mbus::Error& autoReplyError, + bool autoReply) + : _autoReplyError(autoReplyError), + _autoReply(autoReply), + thread(t), + visitor(v), + pendingCount(0) +{ +} + +void +TestVisitorMessageSession::reply(mbus::Reply::UP rep) { + { + vespalib::MonitorGuard guard(_waitMonitor); + pendingCount--; + } + thread.handleMessageBusReply(std::move(rep), visitor); +} + +mbus::Result +TestVisitorMessageSession::send( + std::unique_ptr<documentapi::DocumentMessage> message) +{ + vespalib::MonitorGuard guard(_waitMonitor); + if (_autoReply) { + pendingCount++; + mbus::Reply::UP rep = message->createReply(); + rep->setMessage(mbus::Message::UP(message.release())); + if (_autoReplyError.getCode() == mbus::ErrorCode::NONE) { + reply(std::move(rep)); + return mbus::Result(); + } else { + return mbus::Result(_autoReplyError, + std::unique_ptr<mbus::Message>(message.release())); + } + } else { + pendingCount++; + sentMessages.push_back( + vespalib::LinkedPtr<documentapi::DocumentMessage>( + message.release())); + guard.broadcast(); + return mbus::Result(); + } +} + +void +TestVisitorMessageSession::waitForMessages(unsigned int msgCount) { + framework::defaultimplementation::RealClock clock; + framework::MilliSecTime endTime( + clock.getTimeInMillis() + framework::MilliSecTime(60 * 1000)); + + vespalib::MonitorGuard guard(_waitMonitor); + while (sentMessages.size() < msgCount) { + if (clock.getTimeInMillis() > endTime) { + throw vespalib::IllegalStateException( + vespalib::make_string("Timed out waiting for %u messages " + "in test visitor session", msgCount), + VESPA_STRLOC); + } + guard.wait(1000); + } +}; + +} diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h new file mode 100644 index 00000000000..3ae6ccafb84 --- /dev/null +++ b/storage/src/tests/storageserver/testvisitormessagesession.h @@ -0,0 +1,79 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <deque> +#include <vespa/storage/visiting/visitormessagesession.h> +#include <vespa/storage/visiting/visitorthread.h> +#include <vespa/documentapi/messagebus/messages/documentmessage.h> +#include <vespa/storage/storageserver/priorityconverter.h> + +namespace storage { + +class TestVisitorMessageSession : public VisitorMessageSession +{ +private: + vespalib::Monitor _waitMonitor; + mbus::Error _autoReplyError; + bool _autoReply; + +public: + typedef std::unique_ptr<TestVisitorMessageSession> UP; + + VisitorThread& thread; + Visitor& visitor; + uint32_t pendingCount; + + ~TestVisitorMessageSession(); + + std::deque<vespalib::LinkedPtr<documentapi::DocumentMessage> > sentMessages; + + TestVisitorMessageSession(VisitorThread& t, + Visitor& v, + const mbus::Error& autoReplyError, + bool autoReply); + + void reply(mbus::Reply::UP rep); + + uint32_t pending() { return pendingCount; } + + mbus::Result send(std::unique_ptr<documentapi::DocumentMessage> message); + + void waitForMessages(unsigned int msgCount); + + vespalib::Monitor& getMonitor() { return _waitMonitor; } +}; + +struct TestVisitorMessageSessionFactory : public VisitorMessageSessionFactory +{ + vespalib::Lock _accessLock; + std::vector<TestVisitorMessageSession*> _visitorSessions; + mbus::Error _autoReplyError; + bool _createAutoReplyVisitorSessions; + PriorityConverter _priConverter; + + TestVisitorMessageSessionFactory(vespalib::stringref configId = "") + : _createAutoReplyVisitorSessions(false), + _priConverter(configId) {} + + VisitorMessageSession::UP createSession(Visitor& v, VisitorThread& vt) { + vespalib::LockGuard lock(_accessLock); + TestVisitorMessageSession::UP session( + new TestVisitorMessageSession( + vt, + v, + _autoReplyError, + _createAutoReplyVisitorSessions)); + _visitorSessions.push_back(session.get()); + return VisitorMessageSession::UP(std::move(session)); + } + + documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const + { + return _priConverter.toDocumentPriority(storagePriority); + } + +}; + +} // storage + |