diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/distributor/mergeoperationtest.cpp |
Publish
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r-- | storage/src/tests/distributor/mergeoperationtest.cpp | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp new file mode 100644 index 00000000000..a2373731bc3 --- /dev/null +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -0,0 +1,430 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <boost/lexical_cast.hpp> +#include <cppunit/extensions/HelperMacros.h> +#include <iomanip> +#include <iostream> +#include <memory> +#include <tests/common/dummystoragelink.h> +#include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> +#include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storage/distributor/bucketdbupdater.h> +#include <tests/distributor/distributortestutil.h> +#include <vespa/vespalib/text/stringtokenizer.h> + +using std::shared_ptr; + +namespace storage { +namespace distributor { + +class MergeOperationTest : public CppUnit::TestFixture, + public DistributorTestUtil +{ + CPPUNIT_TEST_SUITE(MergeOperationTest); + CPPUNIT_TEST(testSimple); + CPPUNIT_TEST(testFailIfSourceOnlyCopiesChanged); + CPPUNIT_TEST(testGenerateNodeList); + CPPUNIT_TEST(doNotRemoveCopiesWithPendingMessages); + CPPUNIT_TEST(testDoNotRemoveActiveSourceOnlyCopies); + CPPUNIT_TEST(testMarkRedundantTrustedCopiesAsSourceOnly); + CPPUNIT_TEST(onlyMarkRedundantRetiredReplicasAsSourceOnly); + CPPUNIT_TEST_SUITE_END(); + + std::unique_ptr<PendingMessageTracker> _pendingTracker; + +protected: + void testSimple(); + void testFailIfSourceOnlyCopiesChanged(); + void testGenerateNodeList(); + void doNotRemoveCopiesWithPendingMessages(); + void testDoNotRemoveActiveSourceOnlyCopies(); + void testMarkRedundantTrustedCopiesAsSourceOnly(); + void onlyMarkRedundantRetiredReplicasAsSourceOnly(); + +public: + void setUp() { + createLinks(); + _pendingTracker.reset(new PendingMessageTracker(getComponentRegister())); + _sender.setPendingMessageTracker(*_pendingTracker); + } + + void tearDown() { + close(); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(MergeOperationTest); + +void +MergeOperationTest::testSimple() +{ + getClock().setAbsoluteTimeInSeconds(10); + + addNodesToBucketDB(document::BucketId(16, 1), + "0=10/1/1/t," + "1=20/1/1," + "2=10/1/1/t"); + + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + + MergeOperation op(BucketAndNodes(document::BucketId(16, 1), + toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + CPPUNIT_ASSERT_EQUAL( + std::string( + "MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " + "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " + "reasons to start: ) => 0"), + _sender.getLastCommand(true)); + + sendReply(op); + + CPPUNIT_ASSERT_EQUAL( + std::string("DeleteBucketCommand(BucketId(0x4000000000000001)) " + "Reasons to start: => 1"), + _sender.getLastCommand(true)); + +} + +void +MergeOperationTest::testFailIfSourceOnlyCopiesChanged() +{ + getClock().setAbsoluteTimeInSeconds(10); + + addNodesToBucketDB(document::BucketId(16, 1), + "0=10/1/1/t," + "1=20/1/1," + "2=10/1/1/t"); + + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + + MergeOperation op(BucketAndNodes(document::BucketId(16, 1), + toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " + "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " + "reasons to start: ) => 0"); + + CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true)); + { + const api::MergeBucketCommand& cmd( + dynamic_cast<api::MergeBucketCommand&>(*_sender.commands[0])); + CPPUNIT_ASSERT_EQUAL(uint16_t(0), cmd.getSourceIndex()); + } + + // Source-only copy changed during merge + addNodesToBucketDB(document::BucketId(16, 1), + "0=10/1/1/t," + "1=40/1/1," + "2=10/1/1/t"); + sendReply(op); + // Should not be a remove here! + CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true)); + CPPUNIT_ASSERT(!op.ok()); +} + +namespace { +std::string getNodeList(std::string state, uint32_t redundancy, std::string existing) { + lib::Distribution distribution( + lib::Distribution::getDefaultDistributionConfig(redundancy)); + lib::ClusterState clusterState(state); + vespalib::StringTokenizer st(existing, ","); + std::vector<BucketCopy> bucketDB(st.size()); + for (uint32_t i = 0; i < st.size(); i++) { + std::string num = st[i]; + size_t pos = num.find('t'); + bool trusted = false; + + if (pos != std::string::npos) { + num.erase(pos); + trusted = true; + } + bucketDB[i] = BucketCopy(0, atoi(num.c_str()), + api::BucketInfo(1, 2, 3)); + bucketDB[i].setTrusted(trusted); + } + std::vector<MergeMetaData> nodes(st.size()); + for (uint32_t i = 0; i < st.size(); i++) { + nodes[i] = MergeMetaData(bucketDB[i].getNode(), bucketDB[i]); + } + MergeLimiter limiter(16); + MergeOperation::generateSortedNodeList(distribution, clusterState, + document::BucketId(32, 1), + limiter, nodes); + std::ostringstream actual; + for (uint32_t i = 0; i < nodes.size(); i++) { + if (i != 0) { + actual << ","; + } + actual << nodes[i]._nodeIndex; + if (nodes[i]._sourceOnly) { + actual << "s"; + } + } + return actual.str(); +} +} + +void +MergeOperationTest::testGenerateNodeList() +{ + // If this fails, the distribution has changed and the rest of the test will + // likely fail + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1,4"), + getNodeList("storage:10", 10, "0,1,2,3,4,5,6,7,8,9")); + + // Nodes that are initializing should be treated as up + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7s,6s"), + getNodeList("storage:10 .3.s:i .5.s:i", 2, "7,6,3,5")); // Ideal: 3,5 + + // Order is given by ideal state algorithm, not order of storagenodes in bucket db + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7"), + getNodeList("storage:10", 3, "3,7,5")); + + // Node not in ideal state will be used if not enough nodes in ideal state + CPPUNIT_ASSERT_EQUAL( + std::string("3,7,6"), + getNodeList("storage:10", 3, "3,7,6")); + + // Nodes not in ideal state will be included as source only after redundancy + // is reached + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,8s"), + getNodeList("storage:10", 3, "3,5,7,8")); + + // Need at least redundancy copies that are not source only + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,8,9s"), + getNodeList("storage:10", 3, "3,5,8,9")); + + // Order is given by storagenodes in bucket db + // when no nodes are in ideal state + CPPUNIT_ASSERT_EQUAL( + std::string("4,1,2"), + getNodeList("storage:10", 3, "4,1,2")); + + CPPUNIT_ASSERT_EQUAL( + std::string("3,0s,1s,2s,4s,5s,6s,7s,8s,9s"), + getNodeList("storage:10", 1, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,0s,1s,2s,4s,6s,7s,8s,9s"), + getNodeList("storage:10", 2, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,0s,1s,2s,4s,6s,8s,9s"), + getNodeList("storage:10", 3, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,0s,1s,2s,4s,8s,9s"), + getNodeList("storage:10", 4, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0s,1s,2s,4s,9s"), + getNodeList("storage:10", 5, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,1s,2s,4s,9s"), + getNodeList("storage:10", 6, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,1s,2s,4s"), + getNodeList("storage:10", 7, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1s,4s"), + getNodeList("storage:10", 8, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1,4s"), + getNodeList("storage:10", 9, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1,4"), + getNodeList("storage:10", 10, "0,1,2,3,4,5,6,7,8,9")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,9s,8s,7s,6s,5s,4s,2s,1s,0s"), + getNodeList("storage:10", 1, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,9s,8s,7s,6s,4s,2s,1s,0s"), + getNodeList("storage:10", 2, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,9s,8s,6s,4s,2s,1s,0s"), + getNodeList("storage:10", 3, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,9s,8s,4s,2s,1s,0s"), + getNodeList("storage:10", 4, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,9s,4s,2s,1s,0s"), + getNodeList("storage:10", 5, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9s,4s,2s,1s"), + getNodeList("storage:10", 6, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,4s,2s,1s"), + getNodeList("storage:10", 7, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,4s,1s"), + getNodeList("storage:10", 8, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1,4s"), + getNodeList("storage:10", 9, "9,8,7,6,5,4,3,2,1,0")); + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1,4"), + getNodeList("storage:10", 10, "9,8,7,6,5,4,3,2,1,0")); + + // Trusted copies should not be source only. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,1,2,4s"), + getNodeList("storage:10", 7, "0,1t,2t,3,4,5,6,7,8,9")); + + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9,2,1s,4s"), + getNodeList("storage:10", 7, "0,1,2t,3,4,5,6,7,8,9")); + + // Retired nodes are not in ideal state + // Ideal: 5,7 + CPPUNIT_ASSERT_EQUAL( + std::string("0,2,3s"), + getNodeList("storage:10 .3.s:r", 2, "0,2,3")); + // Ideal: 5,7,6 + CPPUNIT_ASSERT_EQUAL( + std::string("0,2,3"), + getNodeList("storage:10 .3.s:r", 3, "0,2,3")); +} + +void +MergeOperationTest::doNotRemoveCopiesWithPendingMessages() +{ + document::BucketId bucket(16, 1); + + getClock().setAbsoluteTimeInSeconds(10); + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3")); + addNodesToBucketDB(bucket, + "0=10/1/1/t," + "1=20/1/1," + "2=10/1/1/t"); + + MergeOperation op(BucketAndNodes(bucket, + toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " + "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " + "reasons to start: ) => 0"); + + CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true)); + + // Suddenly a wild operation appears to the source only copy! + // Removes are blocked by all and any operation types, so can just choose + // at will. + api::StorageMessage::SP msg( + new api::SetBucketStateCommand(bucket, api::SetBucketStateCommand::ACTIVE)); + msg->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1)); + _pendingTracker->insert(msg); + + + sendReply(op); + // Should not be a remove here! + CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true)); + CPPUNIT_ASSERT(!op.ok()); +} + +void +MergeOperationTest::testDoNotRemoveActiveSourceOnlyCopies() +{ + getClock().setAbsoluteTimeInSeconds(10); + + addNodesToBucketDB(document::BucketId(16, 1), + "0=10/1/1/t," + "1=20/1/1/u/a," + "2=10/1/1/t"); + + _distributor->enableClusterState( + lib::ClusterState("distributor:1 storage:3")); + MergeOperation op(BucketAndNodes(document::BucketId(16, 1), + toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + std::string merge( + "MergeBucketCommand(BucketId(0x4000000000000001), to time " + "10000000, cluster state version: 0, nodes: [0, 2, 1 " + "(source only)], chain: [], reasons to start: ) => 0"); + CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true)); + + sendReply(op); + // No DeleteBucket shall have been sent + CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true)); +} + +void +MergeOperationTest::testMarkRedundantTrustedCopiesAsSourceOnly() +{ + // This test uses the same distribution as testGenerateNodeList(), i.e. + // an ideal state sequence of [3, 5, 7, 6, 8, 0, 9, 2, 1, 4] + + // 3 redundancy, 5 trusted -> 2 trusted source only. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6s,8s"), + getNodeList("storage:10", 3, "3t,5t,7t,6t,8t")); + + // 3 redundancy, 4 trusted -> 1 trusted source only. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6s,8s"), + getNodeList("storage:10", 3, "3t,5t,7t,6t,8")); + + // 3 redundancy, 3 trusted -> 0 trusted source only, 2 non-trusted sources. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6s,8s"), + getNodeList("storage:10", 3, "3t,5t,7t,6,8")); + + // 3 redundancy, 4 trusted -> 1 source only trusted. + // We allow marking a trusted, non-ideal copy as source even when we don't + // have #redundancy trusted _ideal_ copies, as long as we're left with >= + // #redundancy trusted copies in total. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8s"), + getNodeList("storage:10", 3, "3t,5t,7,6t,8t")); + + // Not sufficient number of trusted copies to mark any as source only. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8"), + getNodeList("storage:10", 3, "3t,5,7,6t,8t")); + + // Same as above, with all trusted copies being non-ideal. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8"), + getNodeList("storage:10", 3, "3,5,7,6t,8t")); + + // #redundancy of trusted, but none are ideal. Non-ideal trusted should + // not be marked as source only (though we can mark non-trusted non-ideal + // node as source only). + // Note the node reordering since trusted are added before the rest. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,8,0,9,6s"), + getNodeList("storage:10", 3, "3,5,7,6,8t,0t,9t")); + + // But allow for removing excess trusted, non-ideal copies. + CPPUNIT_ASSERT_EQUAL( + std::string("3,5,7,6,8,0,9s"), + getNodeList("storage:10", 3, "3,5,7,6t,8t,0t,9t")); +} + +void +MergeOperationTest::onlyMarkRedundantRetiredReplicasAsSourceOnly() +{ + // No nodes in ideal state and all nodes are retired. With redundancy of 2 + // we can only mark the last replica in the DB as source-only. Retired + // nodes are meant as source-only due to being migrated away from, but + // source-only nodes will have their replica removed after a successful + // merge, which we cannot allow to happen here. + CPPUNIT_ASSERT_EQUAL( + std::string("0,1,2s"), + getNodeList("storage:3 .0.s.:r .1.s:r .2.s:r", 2, "1,0,2")); +} + +} // distributor +} // storage |