From 72231250ed81e10d66bfe70701e64fa5fe50f712 Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Wed, 15 Jun 2016 23:09:44 +0200 Subject: Publish --- storage/src/tests/distributor/joinbuckettest.cpp | 127 +++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 storage/src/tests/distributor/joinbuckettest.cpp (limited to 'storage/src/tests/distributor/joinbuckettest.cpp') diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp new file mode 100644 index 00000000000..ec7e3aaac32 --- /dev/null +++ b/storage/src/tests/distributor/joinbuckettest.cpp @@ -0,0 +1,127 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include +#include +#include +#include +#include +#include + +namespace storage { +namespace distributor { + +class JoinOperationTest : public CppUnit::TestFixture, public DistributorTestUtil +{ + CPPUNIT_TEST_SUITE(JoinOperationTest); + CPPUNIT_TEST(testSimple); + CPPUNIT_TEST(sendSparseJoinsToNodesWithoutBothSourceBuckets); + CPPUNIT_TEST_SUITE_END(); + + void checkSourceBucketsAndSendReply( + JoinOperation& op, + size_t msgIndex, + const std::vector& wantedIds); + +protected: + void testSimple(); + void sendSparseJoinsToNodesWithoutBothSourceBuckets(); + +public: + void setUp() { + createLinks(); + }; + + void tearDown() { + close(); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(JoinOperationTest); + +void +JoinOperationTest::testSimple() +{ + getConfig().setJoinCount(100); + getConfig().setJoinSize(1000); + + addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300"); + addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200"); + + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:1")); + + JoinOperation op("storage", + BucketAndNodes(document::BucketId(32, 0), + toVector(0)), + toVector(document::BucketId(33, 1), + document::BucketId(33, 0x100000001))); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}); + + CPPUNIT_ASSERT(!getBucket(document::BucketId(33, 0x100000001)).valid()); + CPPUNIT_ASSERT(!getBucket(document::BucketId(33, 1)).valid()); + + BucketDatabase::Entry entry = getBucket(document::BucketId(32, 0)); + CPPUNIT_ASSERT(entry.valid()); + CPPUNIT_ASSERT_EQUAL((uint16_t)0, entry->getNodeRef(0).getNode()); + CPPUNIT_ASSERT_EQUAL(api::BucketInfo(666, 90, 500), + entry->getNodeRef(0).getBucketInfo()); +} + +void +JoinOperationTest::checkSourceBucketsAndSendReply( + JoinOperation& op, + size_t msgIndex, + const std::vector& wantedIds) +{ + CPPUNIT_ASSERT(_sender.commands.size() > msgIndex); + + std::shared_ptr msg(_sender.commands[msgIndex]); + CPPUNIT_ASSERT_EQUAL(api::MessageType::JOINBUCKETS, msg->getType()); + + api::JoinBucketsCommand& joinCmd( + dynamic_cast(*msg)); + CPPUNIT_ASSERT_EQUAL(wantedIds, joinCmd.getSourceBuckets()); + + std::shared_ptr reply(joinCmd.makeReply()); + api::JoinBucketsReply& sreply( + dynamic_cast(*reply)); + sreply.setBucketInfo(api::BucketInfo(666, 90, 500)); + + op.receive(_sender, reply); +} + +/** + * If the set of buckets kept on nodes is disjoint, send sparse joins (same + * bucket id used as both source buckets) for those nodes having only one of + * the buckets. + */ +void +JoinOperationTest::sendSparseJoinsToNodesWithoutBothSourceBuckets() +{ + getConfig().setJoinCount(100); + getConfig().setJoinSize(1000); + + addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300,1=250/50/300"); + addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200"); + + _distributor->enableClusterState( + lib::ClusterState("distributor:1 storage:2")); + + JoinOperation op("storage", + BucketAndNodes(document::BucketId(32, 0), + toVector(0, 1)), + toVector(document::BucketId(33, 1), + document::BucketId(33, 0x100000001))); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}); + checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}}); +} + +} + +} -- cgit v1.2.3