diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-10-26 20:12:51 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-10-26 20:25:55 +0100 |
commit | 243a0602d6d7d6c6cf4ab759fe9f394631f2ba0c (patch) | |
tree | 7251b2840198e01a9c3241d4002bcb5892c33043 /storage/src/tests/persistence/mergehandlertest.cpp | |
parent | 704493e3c14440a943b5e008ad1aeb076c2a8189 (diff) |
Add unit test for bucket merge with partially filled diff from last source only node.
Diffstat (limited to 'storage/src/tests/persistence/mergehandlertest.cpp')
-rw-r--r-- | storage/src/tests/persistence/mergehandlertest.cpp | 184 |
1 files changed, 181 insertions, 3 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 8aacb6027e7..a7f1ab5a58d 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -1090,7 +1090,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x7; - EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7)); + EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList)); EXPECT_TRUE(status.diff.empty()); } @@ -1106,7 +1106,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x6; - EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7)); + EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7, status.nodeList)); EXPECT_EQ(2, status.diff.size()); } @@ -1124,7 +1124,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x5; - EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7)); + EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList)); EXPECT_EQ(2, status.diff.size()); } } @@ -1176,4 +1176,182 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { EXPECT_TRUE(foundTimestamp); } +namespace { + +storage::api::GetBucketDiffCommand::Entry +make_entry(uint64_t timestamp, uint16_t mask) { + storage::api::GetBucketDiffCommand::Entry entry; + entry._timestamp = timestamp; + entry._gid = document::GlobalId(); + entry._headerSize = 0; + entry._bodySize = 0; + entry._flags = MergeHandler::StateFlag::IN_USE; + entry._hasMask = mask; + return entry; +} + +void +fill_entry(storage::api::ApplyBucketDiffCommand::Entry &e, const document::Document& doc, const document::DocumentTypeRepo &repo) +{ + e._docName = doc.getId().toString(); + vespalib::nbostream stream; + doc.serialize(stream); + e._headerBlob.resize(stream.size()); + memcpy(&e._headerBlob[0], stream.peek(), stream.size()); + e._repo = &repo; +} + +/* + * Helper class to check both timestamp and mask at once. + */ +struct EntryCheck +{ + uint64_t _timestamp; + uint16_t _hasMask; + + EntryCheck(uint64_t timestamp, uint16_t hasMask) + : _timestamp(timestamp), + _hasMask(hasMask) + { + } + bool operator==(const api::GetBucketDiffCommand::Entry &rhs) const { + return _timestamp == rhs._timestamp && _hasMask == rhs._hasMask; + } +}; + +std::ostream &operator<<(std::ostream &os, const EntryCheck &entry) +{ + os << "EntryCheck(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")"; + return os; +} + +} + +namespace api { + +std::ostream &operator<<(std::ostream &os, const MergeBucketCommand::Node &node) +{ + os << "Node(" << node.index << "," << (node.sourceOnly ? "true" : "false") << ")"; + return os; +} + +std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &entry) +{ + os << "Entry(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")"; + return os; +} + +} + +TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +{ + using NodeList = decltype(_nodes); + // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 + _nodes.clear(); + _nodes.emplace_back(0, false); + _nodes.emplace_back(1, false); + _nodes.emplace_back(2, true); + _nodes.emplace_back(3, true); + _nodes.emplace_back(4, true); + _maxTimestamp = 30000; // Extend timestamp range to include doc1 and doc2 + + auto doc1 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 1); + auto doc2 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 2); + + MergeHandler handler = createHandler(); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + cmd->setSourceIndex(1234); + MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); + ASSERT_EQ(1u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); + size_t baseline_diff_size = 0; + { + LOG(debug, "checking GetBucketDiff command"); + auto& cmd2 = dynamic_cast<api::GetBucketDiffCommand&>(*messageKeeper()._msgs[0]); + EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes())); + EXPECT_EQ(1, cmd2.getAddress()->getIndex()); + EXPECT_EQ(1234, cmd2.getSourceIndex()); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + baseline_diff_size = cmd2.getDiff().size(); + auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2); + auto &diff = reply->getDiff(); + // doc1 and doc2 is present on nodes 3 and 4. + diff.push_back(make_entry(20000, ((1 << 3) | (1 << 4)))); + diff.push_back(make_entry(20100, ((1 << 3) | (1 << 4)))); + EXPECT_EQ(baseline_diff_size + 2u, reply->getDiff().size()); + handler.handleGetBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "sent handleGetBucketDiffReply"); + } + ASSERT_EQ(2u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType()); + { + LOG(debug, "checking first ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + // Node 4 has been eliminated before the first ApplyBucketDiff command + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); + EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); + EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]); + auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); + // ApplyBucketDiffCommand has a shorter node list, node 2 is not present + EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto& diff = reply->getDiff(); + EXPECT_EQ(2u, diff.size()); + EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry); + EXPECT_EQ(EntryCheck(20100u, 4u), diff[1]._entry); + fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo()); + diff[0]._entry._hasMask |= 2u; + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled first ApplyBucketDiffReply"); + } + ASSERT_EQ(3u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[2]->getType()); + { + LOG(debug, "checking second ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); + EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]); + auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); + auto& diff = reply->getDiff(); + EXPECT_EQ(1u, diff.size()); + EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); + fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); + diff[0]._entry._hasMask |= 2u; + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled second ApplyBucketDiffReply"); + } + ASSERT_EQ(4u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[3]->getType()); + { + LOG(debug, "checking third ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command + EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); + EXPECT_EQ(baseline_diff_size, s.diff.size()); + auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); + EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); + auto& diff = reply->getDiff(); + EXPECT_EQ(baseline_diff_size, diff.size()); + for (auto& e : diff) { + EXPECT_EQ(1u, e._entry._hasMask); + e._entry._hasMask |= 2u; + } + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled third ApplyBucketDiffReply"); + } + ASSERT_EQ(5u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType()); + LOG(debug, "got mergebucket reply"); +} + } // storage |