// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "dummy_cluster_context.h" #include #include #include #include #include #include #include #include #include #include #include using document::test::makeDocumentBucket; using namespace document; using namespace ::testing; namespace storage::distributor { struct SplitOperationTest : Test, DistributorStripeTestUtil { uint32_t splitByteSize; uint32_t tooLargeBucketSize; uint32_t splitCount; uint32_t maxSplitBits; SplitOperationTest(); void SetUp() override { createLinks(); auto cfg = make_config(); cfg->setSplitCount(splitCount); cfg->setSplitSize(splitByteSize); configure_stripe(cfg); } void TearDown() override { close(); } }; SplitOperationTest::SplitOperationTest() : splitByteSize(10_Mi), tooLargeBucketSize(splitByteSize * 1.1), splitCount(UINT32_MAX), maxSplitBits(58) { } namespace { api::StorageMessageAddress _storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0); } TEST_F(SplitOperationTest, simple) { enable_cluster_state("distributor:1 storage:1"); insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000, tooLargeBucketSize, true); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector(0)), maxSplitBits, splitCount, splitByteSize); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender); { ASSERT_EQ(1, _sender.commands().size()); std::shared_ptr msg = _sender.command(0); ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET); EXPECT_EQ(_storage0Address.toString(), msg->getAddress()->toString()); std::shared_ptr reply(msg->makeReply()); auto* sreply = static_cast(reply.get()); sreply->getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000)); sreply->getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000)); op.receive(_sender, reply); } ASSERT_FALSE(getBucket(document::BucketId(16, 1)).valid()); { BucketDatabase::Entry entry = getBucket(document::BucketId(17, 1)); ASSERT_TRUE(entry.valid()); ASSERT_EQ(1, entry->getNodeCount()); EXPECT_EQ(0, entry->getNodeRef(0).getNode()); EXPECT_EQ(100, entry->getNodeRef(0).getChecksum()); EXPECT_EQ(5000000, entry->getNodeRef(0).getTotalDocumentSize()); EXPECT_EQ(600, entry->getNodeRef(0).getDocumentCount()); } { BucketDatabase::Entry entry(getBucket(document::BucketId(17, 0x10001))); ASSERT_TRUE(entry.valid()); ASSERT_EQ(1, entry->getNodeCount()); EXPECT_EQ(0, entry->getNodeRef(0).getNode()); EXPECT_EQ(110, entry->getNodeRef(0).getChecksum()); EXPECT_EQ(6000000, entry->getNodeRef(0).getTotalDocumentSize()); EXPECT_EQ(400, entry->getNodeRef(0).getDocumentCount()); } } TEST_F(SplitOperationTest, multi_node_failure) { { BucketDatabase::Entry entry(document::BucketId(16, 1)); BucketCopy copy(0, 0, api::BucketInfo(250, 1000, tooLargeBucketSize)); entry->addNode(copy, toVector(0)); entry->addNode(BucketCopy(0, 1, copy.getBucketInfo()), toVector(0)); getBucketDatabase().update(entry); } enable_cluster_state("distributor:1 storage:2"); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector(0,1)), maxSplitBits, splitCount, splitByteSize); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender); { ASSERT_EQ(2, _sender.commands().size()); { std::shared_ptr msg = _sender.command(0); ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET); EXPECT_EQ(_storage0Address.toString(), msg->getAddress()->toString()); std::shared_ptr reply(msg->makeReply()); auto& sreply = dynamic_cast(*reply); sreply.setResult(api::ReturnCode::OK); sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000)); sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000)); op.receive(_sender, reply); } sendReply(op, 1, api::ReturnCode::NOT_CONNECTED); } { BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); ASSERT_TRUE(entry.valid()); ASSERT_EQ(1, entry->getNodeCount()); EXPECT_EQ(1, entry->getNodeRef(0).getNode()); EXPECT_EQ(250, entry->getNodeRef(0).getChecksum()); EXPECT_EQ(tooLargeBucketSize, entry->getNodeRef(0).getTotalDocumentSize()); EXPECT_EQ(1000, entry->getNodeRef(0).getDocumentCount()); } { BucketDatabase::Entry entry = getBucket(document::BucketId(17, 1)); ASSERT_TRUE(entry.valid()); ASSERT_EQ(1, entry->getNodeCount()); EXPECT_EQ(0, entry->getNodeRef(0).getNode()); EXPECT_EQ(100, entry->getNodeRef(0).getChecksum()); EXPECT_EQ(5000000, entry->getNodeRef(0).getTotalDocumentSize()); EXPECT_EQ(600, entry->getNodeRef(0).getDocumentCount()); } { BucketDatabase::Entry entry(getBucket(document::BucketId(17, 0x10001))); ASSERT_TRUE(entry.valid()); ASSERT_EQ(1, entry->getNodeCount()); EXPECT_EQ(0, entry->getNodeRef(0).getNode()); EXPECT_EQ(110, entry->getNodeRef(0).getChecksum()); EXPECT_EQ(6000000, entry->getNodeRef(0).getTotalDocumentSize()); EXPECT_EQ(400, entry->getNodeRef(0).getDocumentCount()); } } TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) { enable_cluster_state("distributor:1 storage:2"); document::BucketId sourceBucket(16, 1); /* * Need 3 nodes to reproduce bug 6418516. Otherwise, the source bucket is * left with only 1 copy which implicitly becomes trusted. When this copy * is then split, the distributor db will automatically un-trust all buckets * since it sees that multiple copies are trusted that are not consistent * with each other. This prevents the bug from being visible. */ addNodesToBucketDB(sourceBucket, "0=150/20/30000000/t,1=450/50/60000/u," "2=550/60/70000"); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(sourceBucket), toVector(0, 1)), maxSplitBits, splitCount, splitByteSize); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender); ASSERT_EQ(3, _sender.commands().size()); std::vector childBuckets; childBuckets.emplace_back(17, 1); childBuckets.emplace_back(17, 0x10001); // Note: only 2 out of 3 requests replied to! for (int i = 0; i < 2; ++i) { std::shared_ptr msg = _sender.command(i); ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET); std::shared_ptr reply(msg->makeReply()); auto* sreply = static_cast(reply.get()); // Make sure copies differ so they cannot become implicitly trusted. sreply->getSplitInfo().emplace_back(childBuckets[0], api::BucketInfo(100 + i, 600, 5000000)); sreply->getSplitInfo().emplace_back(childBuckets[1], api::BucketInfo(110 + i, 400, 6000000)); op.receive(_sender, reply); } ASSERT_TRUE(getBucket(sourceBucket).valid()); // Still alive for (uint32_t i = 0; i < 2; ++i) { BucketDatabase::Entry entry(getBucket(childBuckets[i])); ASSERT_TRUE(entry.valid()); ASSERT_EQ(2, entry->getNodes().size()); for (uint16_t j = 0; j < 2; ++j) { EXPECT_FALSE(entry->getNodeRef(i).trusted()); } } } TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); OperationSequencer op_seq; enable_cluster_state("distributor:1 storage:2"); document::BucketId joinTarget(2, 1); std::vector joinSources = { document::BucketId(3, 1), document::BucketId(3, 5) }; auto joinCmd = std::make_shared(makeDocumentBucket(joinTarget)); joinCmd->getSourceBuckets() = joinSources; joinCmd->setAddress(_storage0Address); pending_message_tracker().insert(joinCmd); insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, true); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(joinTarget), toVector(0)), maxSplitBits, splitCount, splitByteSize); EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); // Now, pretend there's a join for another node in the same bucket. This // will happen when a join is partially completed. pending_message_tracker().clearMessagesForNode(0); EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); joinCmd->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1)); pending_message_tracker().insert(joinCmd); EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); OperationSequencer op_seq; enable_cluster_state("distributor:1 storage:2"); document::BucketId source_bucket(16, 1); insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, true); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector(0)), maxSplitBits, splitCount, splitByteSize); EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo"); EXPECT_TRUE(token.valid()); EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } TEST_F(SplitOperationTest, cancelled_node_does_not_update_bucket_db) { enable_cluster_state("distributor:1 storage:1"); insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000, tooLargeBucketSize, true); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector(0)), maxSplitBits, splitCount, splitByteSize); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender); op.cancel(_sender, CancelScope::of_node_subset({0})); { ASSERT_EQ(_sender.commands().size(), 1); std::shared_ptr msg = _sender.command(0); std::shared_ptr reply(msg->makeReply()); auto& sreply = dynamic_cast(*reply); sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000)); sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000)); op.receive(_sender, reply); } // DB is not touched, so source bucket remains (will be removed during actual operation) // while target buckets are not created EXPECT_TRUE(getBucket(document::BucketId(16, 1)).valid()); EXPECT_FALSE(getBucket(document::BucketId(17, 0x00001)).valid()); EXPECT_FALSE(getBucket(document::BucketId(17, 0x10001)).valid()); EXPECT_FALSE(op.ok()); } } // storage::distributor