// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include #include #include #include using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using document::FixedBucketSpaces; using namespace ::testing; namespace storage::distributor { struct IdealStateManagerTest : Test, DistributorTestUtil { IdealStateManagerTest() : Test(), DistributorTestUtil(), _bucketSpaces() {} void SetUp() override { createLinks(); _bucketSpaces = getBucketSpaces(); }; void TearDown() override { close(); } void setSystemState(const lib::ClusterState& systemState) { _distributor->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); } bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, const PendingMessageTracker& tracker) const { return op.checkBlock(bucket, tracker); } bool checkBlockForAllNodes(const IdealStateOperation& op, const document::Bucket& bucket, const PendingMessageTracker& tracker) const { return op.checkBlockForAllNodes(bucket, tracker); } std::vector _bucketSpaces; std::string makeBucketStatusString(const std::string &defaultSpaceBucketStatus); }; TEST_F(IdealStateManagerTest, sibling) { EXPECT_EQ(document::BucketId(1,1), getIdealStateManager().getDistributorComponent() .getSibling(document::BucketId(1, 0))); EXPECT_EQ(document::BucketId(1,0), getIdealStateManager().getDistributorComponent() .getSibling(document::BucketId(1, 1))); EXPECT_EQ(document::BucketId(2,3), getIdealStateManager().getDistributorComponent() .getSibling(document::BucketId(2, 1))); EXPECT_EQ(document::BucketId(2,1), getIdealStateManager().getDistributorComponent() .getSibling(document::BucketId(2, 3))); } TEST_F(IdealStateManagerTest, status_page) { close(); getDirConfig().getConfig("stor-distributormanager").set("splitsize", "100"); getDirConfig().getConfig("stor-distributormanager").set("splitcount", "1000000"); getDirConfig().getConfig("stor-distributormanager").set("joinsize", "0"); getDirConfig().getConfig("stor-distributormanager").set("joincount", "0"); createLinks(); setupDistributor(1, 1, "distributor:1 storage:1"); insertBucketInfo(document::BucketId(16, 5), 0, 0xff, 100, 200, true, true); insertBucketInfo(document::BucketId(16, 2), 0, 0xff, 10, 10, true, true); std::ostringstream ost; getIdealStateManager().getBucketStatus(ost); EXPECT_EQ(makeBucketStatusString("BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]
\n" "BucketId(0x4000000000000005): : split: [Splitting bucket because its maximum size (200 b, 100 docs, 100 meta, 200 b total) is " "higher than the configured limit of (100, 1000000)] [node(idx=0,crc=0xff,docs=100/100,bytes=200/200,trusted=true," "active=true,ready=false)]
\n"), ost.str()); } TEST_F(IdealStateManagerTest, disabled_state_checker) { setupDistributor(1, 1, "distributor:1 storage:1"); getConfig().setSplitSize(100); getConfig().setSplitCount(1000000); getConfig().disableStateChecker("SplitBucket"); insertBucketInfo(document::BucketId(16, 5), 0, 0xff, 100, 200, true, true); insertBucketInfo(document::BucketId(16, 2), 0, 0xff, 10, 10, true, true); std::ostringstream ost; getIdealStateManager().getBucketStatus(ost); EXPECT_EQ(makeBucketStatusString( "BucketId(0x4000000000000002) : [node(idx=0,crc=0xff,docs=10/10,bytes=10/10,trusted=true,active=true,ready=false)]
\n" "BucketId(0x4000000000000005): : split: [Splitting bucket because its maximum size (200 b, 100 docs, 100 meta, 200 b total) is " "higher than the configured limit of (100, 1000000)] [node(idx=0,crc=0xff,docs=100/100,bytes=200/200,trusted=true," "active=true,ready=false)]
\n"), ost.str()); tick(); EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); } TEST_F(IdealStateManagerTest, clear_active_on_node_down) { setSystemState(lib::ClusterState("distributor:1 storage:3")); for (int i = 1; i < 4; i++) { insertBucketInfo(document::BucketId(16, i), 0, 0xff, 100, 200); insertBucketInfo(document::BucketId(16, i), 1, 0xffe, 1020, 2300); insertBucketInfo(document::BucketId(16, i), 2, 0xfff, 1030, 2400); } tick(); // Start all three operations. for (uint32_t i = 0; i < 3; ++i) { tick(); } EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n" "setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000002)) (pri 100)\n" "setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000003)) (pri 100)\n", _distributor->getActiveIdealStateOperations()); setSystemState(lib::ClusterState("distributor:1 storage:3 .0.s:d")); EXPECT_EQ("", _distributor->getActiveIdealStateOperations()); EXPECT_EQ(0, _distributor->getPendingMessageTracker() .getNodeInfo().getPendingCount(0)); } TEST_F(IdealStateManagerTest, recheck_when_active) { for (uint32_t j = 0; j < 3; j++) { insertBucketInfo(document::BucketId(16, 1), j, 0xff - j, 100, 200); } setSystemState(lib::ClusterState("distributor:1 storage:3")); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", _distributor->getActiveIdealStateOperations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", _distributor->getActiveIdealStateOperations()); tick(); EXPECT_EQ("setbucketstate to [0] Bucket(BucketSpace(0x0000000000000001), BucketId(0x4000000000000001)) (pri 100)\n", _distributor->getActiveIdealStateOperations()); } TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; PendingMessageTracker tracker(_node->getComponentRegister()); document::BucketId bid(16, 1234); std::vector buckets; // RequestBucketInfoCommand does not have a specific bucketid since it's // sent to the entire node. It will then use a null bucketid. { auto msg = std::make_shared(makeBucketSpace(), buckets); msg->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 4)); tracker.insert(msg); } { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector(3, 4))); EXPECT_TRUE(op.isBlocked(tracker)); } { // Don't trigger on requests to other nodes. RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector(3, 5))); EXPECT_FALSE(op.isBlocked(tracker)); } // Don't block on null-bucket messages that aren't RequestBucketInfo. { auto msg = std::make_shared(makeBucketSpace(), "foo", "bar", "baz"); msg->setAddress(api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 7)); tracker.insert(msg); } { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector(7))); EXPECT_FALSE(op.isBlocked(tracker)); } } TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) { setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; PendingMessageTracker tracker(_node->getComponentRegister()); document::BucketId bid(16, 1234); { auto msg = std::make_shared(makeDocumentBucket(bid)); msg->setAddress( api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 4)); tracker.insert(msg); } { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector(7))); // Not blocked for exact node match. EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker)); // But blocked for bucket match! EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker)); } } std::string IdealStateManagerTest::makeBucketStatusString(const std::string &defaultSpaceBucketStatus) { std::ostringstream ost; for (const auto &bucketSpace : _bucketSpaces) { ost << "

" << FixedBucketSpaces::to_string(bucketSpace) << " - " << bucketSpace << "

\n"; if (bucketSpace == FixedBucketSpaces::default_space()) { ost << defaultSpaceBucketStatus; } } return ost.str(); } } // storage::distributor