summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/persistence/mergehandlertest.cpp
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-10-26 20:12:51 +0100
committerTor Egge <Tor.Egge@broadpark.no>2020-10-26 20:25:55 +0100
commit243a0602d6d7d6c6cf4ab759fe9f394631f2ba0c (patch)
tree7251b2840198e01a9c3241d4002bcb5892c33043 /storage/src/tests/persistence/mergehandlertest.cpp
parent704493e3c14440a943b5e008ad1aeb076c2a8189 (diff)
Add unit test for bucket merge with partially filled diff from last source only node.
Diffstat (limited to 'storage/src/tests/persistence/mergehandlertest.cpp')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp184
1 files changed, 181 insertions, 3 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 8aacb6027e7..a7f1ab5a58d 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -1090,7 +1090,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
applyDiff[1]._entry._flags = 0x3;
applyDiff[1]._entry._hasMask = 0x7;
- EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7));
+ EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList));
EXPECT_TRUE(status.diff.empty());
}
@@ -1106,7 +1106,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
applyDiff[1]._entry._flags = 0x3;
applyDiff[1]._entry._hasMask = 0x6;
- EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7));
+ EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7, status.nodeList));
EXPECT_EQ(2, status.diff.size());
}
@@ -1124,7 +1124,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
applyDiff[1]._entry._flags = 0x3;
applyDiff[1]._entry._hasMask = 0x5;
- EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7));
+ EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList));
EXPECT_EQ(2, status.diff.size());
}
}
@@ -1176,4 +1176,182 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
EXPECT_TRUE(foundTimestamp);
}
+namespace {
+
+storage::api::GetBucketDiffCommand::Entry
+make_entry(uint64_t timestamp, uint16_t mask) {
+ storage::api::GetBucketDiffCommand::Entry entry;
+ entry._timestamp = timestamp;
+ entry._gid = document::GlobalId();
+ entry._headerSize = 0;
+ entry._bodySize = 0;
+ entry._flags = MergeHandler::StateFlag::IN_USE;
+ entry._hasMask = mask;
+ return entry;
+}
+
+void
+fill_entry(storage::api::ApplyBucketDiffCommand::Entry &e, const document::Document& doc, const document::DocumentTypeRepo &repo)
+{
+ e._docName = doc.getId().toString();
+ vespalib::nbostream stream;
+ doc.serialize(stream);
+ e._headerBlob.resize(stream.size());
+ memcpy(&e._headerBlob[0], stream.peek(), stream.size());
+ e._repo = &repo;
+}
+
+/*
+ * Helper class to check both timestamp and mask at once.
+ */
+struct EntryCheck
+{
+ uint64_t _timestamp;
+ uint16_t _hasMask;
+
+ EntryCheck(uint64_t timestamp, uint16_t hasMask)
+ : _timestamp(timestamp),
+ _hasMask(hasMask)
+ {
+ }
+ bool operator==(const api::GetBucketDiffCommand::Entry &rhs) const {
+ return _timestamp == rhs._timestamp && _hasMask == rhs._hasMask;
+ }
+};
+
+std::ostream &operator<<(std::ostream &os, const EntryCheck &entry)
+{
+ os << "EntryCheck(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")";
+ return os;
+}
+
+}
+
+namespace api {
+
+std::ostream &operator<<(std::ostream &os, const MergeBucketCommand::Node &node)
+{
+ os << "Node(" << node.index << "," << (node.sourceOnly ? "true" : "false") << ")";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &entry)
+{
+ os << "Entry(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")";
+ return os;
+}
+
+}
+
+TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+{
+ using NodeList = decltype(_nodes);
+ // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
+ _nodes.clear();
+ _nodes.emplace_back(0, false);
+ _nodes.emplace_back(1, false);
+ _nodes.emplace_back(2, true);
+ _nodes.emplace_back(3, true);
+ _nodes.emplace_back(4, true);
+ _maxTimestamp = 30000; // Extend timestamp range to include doc1 and doc2
+
+ auto doc1 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 1);
+ auto doc2 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 2);
+
+ MergeHandler handler = createHandler();
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ cmd->setSourceIndex(1234);
+ MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
+ ASSERT_EQ(1u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType());
+ size_t baseline_diff_size = 0;
+ {
+ LOG(debug, "checking GetBucketDiff command");
+ auto& cmd2 = dynamic_cast<api::GetBucketDiffCommand&>(*messageKeeper()._msgs[0]);
+ EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes()));
+ EXPECT_EQ(1, cmd2.getAddress()->getIndex());
+ EXPECT_EQ(1234, cmd2.getSourceIndex());
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
+ baseline_diff_size = cmd2.getDiff().size();
+ auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2);
+ auto &diff = reply->getDiff();
+ // doc1 and doc2 is present on nodes 3 and 4.
+ diff.push_back(make_entry(20000, ((1 << 3) | (1 << 4))));
+ diff.push_back(make_entry(20100, ((1 << 3) | (1 << 4))));
+ EXPECT_EQ(baseline_diff_size + 2u, reply->getDiff().size());
+ handler.handleGetBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "sent handleGetBucketDiffReply");
+ }
+ ASSERT_EQ(2u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType());
+ {
+ LOG(debug, "checking first ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ // Node 4 has been eliminated before the first ApplyBucketDiff command
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
+ EXPECT_EQ(baseline_diff_size + 2u, s.diff.size());
+ EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]);
+ auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
+ // ApplyBucketDiffCommand has a shorter node list, node 2 is not present
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(2u, diff.size());
+ EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry);
+ EXPECT_EQ(EntryCheck(20100u, 4u), diff[1]._entry);
+ fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo());
+ diff[0]._entry._hasMask |= 2u;
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled first ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(3u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[2]->getType());
+ {
+ LOG(debug, "checking second ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
+ EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]);
+ auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(1u, diff.size());
+ EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
+ fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
+ diff[0]._entry._hasMask |= 2u;
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled second ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(4u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[3]->getType());
+ {
+ LOG(debug, "checking third ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList);
+ EXPECT_EQ(baseline_diff_size, s.diff.size());
+ auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(baseline_diff_size, diff.size());
+ for (auto& e : diff) {
+ EXPECT_EQ(1u, e._entry._hasMask);
+ e._entry._hasMask |= 2u;
+ }
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled third ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(5u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType());
+ LOG(debug, "got mergebucket reply");
+}
+
} // storage