summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/mergeoperationtest.cpp
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/distributor/mergeoperationtest.cpp
Publish
Diffstat (limited to 'storage/src/tests/distributor/mergeoperationtest.cpp')
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp430
1 files changed, 430 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
new file mode 100644
index 00000000000..a2373731bc3
--- /dev/null
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -0,0 +1,430 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <boost/lexical_cast.hpp>
+#include <cppunit/extensions/HelperMacros.h>
+#include <iomanip>
+#include <iostream>
+#include <memory>
+#include <tests/common/dummystoragelink.h>
+#include <vespa/storage/distributor/idealstatemanager.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storage/distributor/operations/idealstate/mergeoperation.h>
+#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storage/distributor/bucketdbupdater.h>
+#include <tests/distributor/distributortestutil.h>
+#include <vespa/vespalib/text/stringtokenizer.h>
+
+using std::shared_ptr;
+
+namespace storage {
+namespace distributor {
+
+class MergeOperationTest : public CppUnit::TestFixture,
+ public DistributorTestUtil
+{
+ CPPUNIT_TEST_SUITE(MergeOperationTest);
+ CPPUNIT_TEST(testSimple);
+ CPPUNIT_TEST(testFailIfSourceOnlyCopiesChanged);
+ CPPUNIT_TEST(testGenerateNodeList);
+ CPPUNIT_TEST(doNotRemoveCopiesWithPendingMessages);
+ CPPUNIT_TEST(testDoNotRemoveActiveSourceOnlyCopies);
+ CPPUNIT_TEST(testMarkRedundantTrustedCopiesAsSourceOnly);
+ CPPUNIT_TEST(onlyMarkRedundantRetiredReplicasAsSourceOnly);
+ CPPUNIT_TEST_SUITE_END();
+
+ std::unique_ptr<PendingMessageTracker> _pendingTracker;
+
+protected:
+ void testSimple();
+ void testFailIfSourceOnlyCopiesChanged();
+ void testGenerateNodeList();
+ void doNotRemoveCopiesWithPendingMessages();
+ void testDoNotRemoveActiveSourceOnlyCopies();
+ void testMarkRedundantTrustedCopiesAsSourceOnly();
+ void onlyMarkRedundantRetiredReplicasAsSourceOnly();
+
+public:
+ void setUp() {
+ createLinks();
+ _pendingTracker.reset(new PendingMessageTracker(getComponentRegister()));
+ _sender.setPendingMessageTracker(*_pendingTracker);
+ }
+
+ void tearDown() {
+ close();
+ }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(MergeOperationTest);
+
+void
+MergeOperationTest::testSimple()
+{
+ getClock().setAbsoluteTimeInSeconds(10);
+
+ addNodesToBucketDB(document::BucketId(16, 1),
+ "0=10/1/1/t,"
+ "1=20/1/1,"
+ "2=10/1/1/t");
+
+ _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
+
+ MergeOperation op(BucketAndNodes(document::BucketId(16, 1),
+ toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender, framework::MilliSecTime(0));
+
+ CPPUNIT_ASSERT_EQUAL(
+ std::string(
+ "MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
+ "reasons to start: ) => 0"),
+ _sender.getLastCommand(true));
+
+ sendReply(op);
+
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("DeleteBucketCommand(BucketId(0x4000000000000001)) "
+ "Reasons to start: => 1"),
+ _sender.getLastCommand(true));
+
+}
+
+void
+MergeOperationTest::testFailIfSourceOnlyCopiesChanged()
+{
+ getClock().setAbsoluteTimeInSeconds(10);
+
+ addNodesToBucketDB(document::BucketId(16, 1),
+ "0=10/1/1/t,"
+ "1=20/1/1,"
+ "2=10/1/1/t");
+
+ _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
+
+ MergeOperation op(BucketAndNodes(document::BucketId(16, 1),
+ toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender, framework::MilliSecTime(0));
+
+ std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
+ "reasons to start: ) => 0");
+
+ CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true));
+ {
+ const api::MergeBucketCommand& cmd(
+ dynamic_cast<api::MergeBucketCommand&>(*_sender.commands[0]));
+ CPPUNIT_ASSERT_EQUAL(uint16_t(0), cmd.getSourceIndex());
+ }
+
+ // Source-only copy changed during merge
+ addNodesToBucketDB(document::BucketId(16, 1),
+ "0=10/1/1/t,"
+ "1=40/1/1,"
+ "2=10/1/1/t");
+ sendReply(op);
+ // Should not be a remove here!
+ CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true));
+ CPPUNIT_ASSERT(!op.ok());
+}
+
+namespace {
+std::string getNodeList(std::string state, uint32_t redundancy, std::string existing) {
+ lib::Distribution distribution(
+ lib::Distribution::getDefaultDistributionConfig(redundancy));
+ lib::ClusterState clusterState(state);
+ vespalib::StringTokenizer st(existing, ",");
+ std::vector<BucketCopy> bucketDB(st.size());
+ for (uint32_t i = 0; i < st.size(); i++) {
+ std::string num = st[i];
+ size_t pos = num.find('t');
+ bool trusted = false;
+
+ if (pos != std::string::npos) {
+ num.erase(pos);
+ trusted = true;
+ }
+ bucketDB[i] = BucketCopy(0, atoi(num.c_str()),
+ api::BucketInfo(1, 2, 3));
+ bucketDB[i].setTrusted(trusted);
+ }
+ std::vector<MergeMetaData> nodes(st.size());
+ for (uint32_t i = 0; i < st.size(); i++) {
+ nodes[i] = MergeMetaData(bucketDB[i].getNode(), bucketDB[i]);
+ }
+ MergeLimiter limiter(16);
+ MergeOperation::generateSortedNodeList(distribution, clusterState,
+ document::BucketId(32, 1),
+ limiter, nodes);
+ std::ostringstream actual;
+ for (uint32_t i = 0; i < nodes.size(); i++) {
+ if (i != 0) {
+ actual << ",";
+ }
+ actual << nodes[i]._nodeIndex;
+ if (nodes[i]._sourceOnly) {
+ actual << "s";
+ }
+ }
+ return actual.str();
+}
+}
+
+void
+MergeOperationTest::testGenerateNodeList()
+{
+ // If this fails, the distribution has changed and the rest of the test will
+ // likely fail
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1,4"),
+ getNodeList("storage:10", 10, "0,1,2,3,4,5,6,7,8,9"));
+
+ // Nodes that are initializing should be treated as up
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7s,6s"),
+ getNodeList("storage:10 .3.s:i .5.s:i", 2, "7,6,3,5")); // Ideal: 3,5
+
+ // Order is given by ideal state algorithm, not order of storagenodes in bucket db
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7"),
+ getNodeList("storage:10", 3, "3,7,5"));
+
+ // Node not in ideal state will be used if not enough nodes in ideal state
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,7,6"),
+ getNodeList("storage:10", 3, "3,7,6"));
+
+ // Nodes not in ideal state will be included as source only after redundancy
+ // is reached
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,8s"),
+ getNodeList("storage:10", 3, "3,5,7,8"));
+
+ // Need at least redundancy copies that are not source only
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,8,9s"),
+ getNodeList("storage:10", 3, "3,5,8,9"));
+
+ // Order is given by storagenodes in bucket db
+ // when no nodes are in ideal state
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("4,1,2"),
+ getNodeList("storage:10", 3, "4,1,2"));
+
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,0s,1s,2s,4s,5s,6s,7s,8s,9s"),
+ getNodeList("storage:10", 1, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,0s,1s,2s,4s,6s,7s,8s,9s"),
+ getNodeList("storage:10", 2, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,0s,1s,2s,4s,6s,8s,9s"),
+ getNodeList("storage:10", 3, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,0s,1s,2s,4s,8s,9s"),
+ getNodeList("storage:10", 4, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0s,1s,2s,4s,9s"),
+ getNodeList("storage:10", 5, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,1s,2s,4s,9s"),
+ getNodeList("storage:10", 6, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,1s,2s,4s"),
+ getNodeList("storage:10", 7, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1s,4s"),
+ getNodeList("storage:10", 8, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1,4s"),
+ getNodeList("storage:10", 9, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1,4"),
+ getNodeList("storage:10", 10, "0,1,2,3,4,5,6,7,8,9"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,9s,8s,7s,6s,5s,4s,2s,1s,0s"),
+ getNodeList("storage:10", 1, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,9s,8s,7s,6s,4s,2s,1s,0s"),
+ getNodeList("storage:10", 2, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,9s,8s,6s,4s,2s,1s,0s"),
+ getNodeList("storage:10", 3, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,9s,8s,4s,2s,1s,0s"),
+ getNodeList("storage:10", 4, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,9s,4s,2s,1s,0s"),
+ getNodeList("storage:10", 5, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9s,4s,2s,1s"),
+ getNodeList("storage:10", 6, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,4s,2s,1s"),
+ getNodeList("storage:10", 7, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,4s,1s"),
+ getNodeList("storage:10", 8, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1,4s"),
+ getNodeList("storage:10", 9, "9,8,7,6,5,4,3,2,1,0"));
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1,4"),
+ getNodeList("storage:10", 10, "9,8,7,6,5,4,3,2,1,0"));
+
+ // Trusted copies should not be source only.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,1,2,4s"),
+ getNodeList("storage:10", 7, "0,1t,2t,3,4,5,6,7,8,9"));
+
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9,2,1s,4s"),
+ getNodeList("storage:10", 7, "0,1,2t,3,4,5,6,7,8,9"));
+
+ // Retired nodes are not in ideal state
+ // Ideal: 5,7
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("0,2,3s"),
+ getNodeList("storage:10 .3.s:r", 2, "0,2,3"));
+ // Ideal: 5,7,6
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("0,2,3"),
+ getNodeList("storage:10 .3.s:r", 3, "0,2,3"));
+}
+
+void
+MergeOperationTest::doNotRemoveCopiesWithPendingMessages()
+{
+ document::BucketId bucket(16, 1);
+
+ getClock().setAbsoluteTimeInSeconds(10);
+ _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:3"));
+ addNodesToBucketDB(bucket,
+ "0=10/1/1/t,"
+ "1=20/1/1,"
+ "2=10/1/1/t");
+
+ MergeOperation op(BucketAndNodes(bucket,
+ toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender, framework::MilliSecTime(0));
+
+ std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
+ "reasons to start: ) => 0");
+
+ CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true));
+
+ // Suddenly a wild operation appears to the source only copy!
+ // Removes are blocked by all and any operation types, so can just choose
+ // at will.
+ api::StorageMessage::SP msg(
+ new api::SetBucketStateCommand(bucket, api::SetBucketStateCommand::ACTIVE));
+ msg->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 1));
+ _pendingTracker->insert(msg);
+
+
+ sendReply(op);
+ // Should not be a remove here!
+ CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true));
+ CPPUNIT_ASSERT(!op.ok());
+}
+
+void
+MergeOperationTest::testDoNotRemoveActiveSourceOnlyCopies()
+{
+ getClock().setAbsoluteTimeInSeconds(10);
+
+ addNodesToBucketDB(document::BucketId(16, 1),
+ "0=10/1/1/t,"
+ "1=20/1/1/u/a,"
+ "2=10/1/1/t");
+
+ _distributor->enableClusterState(
+ lib::ClusterState("distributor:1 storage:3"));
+ MergeOperation op(BucketAndNodes(document::BucketId(16, 1),
+ toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender, framework::MilliSecTime(0));
+
+ std::string merge(
+ "MergeBucketCommand(BucketId(0x4000000000000001), to time "
+ "10000000, cluster state version: 0, nodes: [0, 2, 1 "
+ "(source only)], chain: [], reasons to start: ) => 0");
+ CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true));
+
+ sendReply(op);
+ // No DeleteBucket shall have been sent
+ CPPUNIT_ASSERT_EQUAL(merge, _sender.getLastCommand(true));
+}
+
+void
+MergeOperationTest::testMarkRedundantTrustedCopiesAsSourceOnly()
+{
+ // This test uses the same distribution as testGenerateNodeList(), i.e.
+ // an ideal state sequence of [3, 5, 7, 6, 8, 0, 9, 2, 1, 4]
+
+ // 3 redundancy, 5 trusted -> 2 trusted source only.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6s,8s"),
+ getNodeList("storage:10", 3, "3t,5t,7t,6t,8t"));
+
+ // 3 redundancy, 4 trusted -> 1 trusted source only.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6s,8s"),
+ getNodeList("storage:10", 3, "3t,5t,7t,6t,8"));
+
+ // 3 redundancy, 3 trusted -> 0 trusted source only, 2 non-trusted sources.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6s,8s"),
+ getNodeList("storage:10", 3, "3t,5t,7t,6,8"));
+
+ // 3 redundancy, 4 trusted -> 1 source only trusted.
+ // We allow marking a trusted, non-ideal copy as source even when we don't
+ // have #redundancy trusted _ideal_ copies, as long as we're left with >=
+ // #redundancy trusted copies in total.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8s"),
+ getNodeList("storage:10", 3, "3t,5t,7,6t,8t"));
+
+ // Not sufficient number of trusted copies to mark any as source only.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8"),
+ getNodeList("storage:10", 3, "3t,5,7,6t,8t"));
+
+ // Same as above, with all trusted copies being non-ideal.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8"),
+ getNodeList("storage:10", 3, "3,5,7,6t,8t"));
+
+ // #redundancy of trusted, but none are ideal. Non-ideal trusted should
+ // not be marked as source only (though we can mark non-trusted non-ideal
+ // node as source only).
+ // Note the node reordering since trusted are added before the rest.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,8,0,9,6s"),
+ getNodeList("storage:10", 3, "3,5,7,6,8t,0t,9t"));
+
+ // But allow for removing excess trusted, non-ideal copies.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("3,5,7,6,8,0,9s"),
+ getNodeList("storage:10", 3, "3,5,7,6t,8t,0t,9t"));
+}
+
+void
+MergeOperationTest::onlyMarkRedundantRetiredReplicasAsSourceOnly()
+{
+ // No nodes in ideal state and all nodes are retired. With redundancy of 2
+ // we can only mark the last replica in the DB as source-only. Retired
+ // nodes are meant as source-only due to being migrated away from, but
+ // source-only nodes will have their replica removed after a successful
+ // merge, which we cannot allow to happen here.
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("0,1,2s"),
+ getNodeList("storage:3 .0.s.:r .1.s:r .2.s:r", 2, "1,0,2"));
+}
+
+} // distributor
+} // storage