diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storage/src/tests/distributor/joinbuckettest.cpp |
Publish
Diffstat (limited to 'storage/src/tests/distributor/joinbuckettest.cpp')
-rw-r--r-- | storage/src/tests/distributor/joinbuckettest.cpp | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp new file mode 100644 index 00000000000..ec7e3aaac32 --- /dev/null +++ b/storage/src/tests/distributor/joinbuckettest.cpp @@ -0,0 +1,127 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <cppunit/extensions/HelperMacros.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storage/distributor/operations/idealstate/joinoperation.h> +#include <vespa/storage/distributor/idealstatemanager.h> +#include <tests/distributor/distributortestutil.h> + +namespace storage { +namespace distributor { + +class JoinOperationTest : public CppUnit::TestFixture, public DistributorTestUtil +{ + CPPUNIT_TEST_SUITE(JoinOperationTest); + CPPUNIT_TEST(testSimple); + CPPUNIT_TEST(sendSparseJoinsToNodesWithoutBothSourceBuckets); + CPPUNIT_TEST_SUITE_END(); + + void checkSourceBucketsAndSendReply( + JoinOperation& op, + size_t msgIndex, + const std::vector<document::BucketId>& wantedIds); + +protected: + void testSimple(); + void sendSparseJoinsToNodesWithoutBothSourceBuckets(); + +public: + void setUp() { + createLinks(); + }; + + void tearDown() { + close(); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(JoinOperationTest); + +void +JoinOperationTest::testSimple() +{ + getConfig().setJoinCount(100); + getConfig().setJoinSize(1000); + + addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300"); + addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200"); + + _distributor->enableClusterState(lib::ClusterState("distributor:1 storage:1")); + + JoinOperation op("storage", + BucketAndNodes(document::BucketId(32, 0), + toVector<uint16_t>(0)), + toVector(document::BucketId(33, 1), + document::BucketId(33, 0x100000001))); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}); + + CPPUNIT_ASSERT(!getBucket(document::BucketId(33, 0x100000001)).valid()); + CPPUNIT_ASSERT(!getBucket(document::BucketId(33, 1)).valid()); + + BucketDatabase::Entry entry = getBucket(document::BucketId(32, 0)); + CPPUNIT_ASSERT(entry.valid()); + CPPUNIT_ASSERT_EQUAL((uint16_t)0, entry->getNodeRef(0).getNode()); + CPPUNIT_ASSERT_EQUAL(api::BucketInfo(666, 90, 500), + entry->getNodeRef(0).getBucketInfo()); +} + +void +JoinOperationTest::checkSourceBucketsAndSendReply( + JoinOperation& op, + size_t msgIndex, + const std::vector<document::BucketId>& wantedIds) +{ + CPPUNIT_ASSERT(_sender.commands.size() > msgIndex); + + std::shared_ptr<api::StorageCommand> msg(_sender.commands[msgIndex]); + CPPUNIT_ASSERT_EQUAL(api::MessageType::JOINBUCKETS, msg->getType()); + + api::JoinBucketsCommand& joinCmd( + dynamic_cast<api::JoinBucketsCommand&>(*msg)); + CPPUNIT_ASSERT_EQUAL(wantedIds, joinCmd.getSourceBuckets()); + + std::shared_ptr<api::StorageReply> reply(joinCmd.makeReply()); + api::JoinBucketsReply& sreply( + dynamic_cast<api::JoinBucketsReply&>(*reply)); + sreply.setBucketInfo(api::BucketInfo(666, 90, 500)); + + op.receive(_sender, reply); +} + +/** + * If the set of buckets kept on nodes is disjoint, send sparse joins (same + * bucket id used as both source buckets) for those nodes having only one of + * the buckets. + */ +void +JoinOperationTest::sendSparseJoinsToNodesWithoutBothSourceBuckets() +{ + getConfig().setJoinCount(100); + getConfig().setJoinSize(1000); + + addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300,1=250/50/300"); + addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200"); + + _distributor->enableClusterState( + lib::ClusterState("distributor:1 storage:2")); + + JoinOperation op("storage", + BucketAndNodes(document::BucketId(32, 0), + toVector<uint16_t>(0, 1)), + toVector(document::BucketId(33, 1), + document::BucketId(33, 0x100000001))); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender, framework::MilliSecTime(0)); + + checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}); + checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}}); +} + +} + +} |