diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-10-27 15:20:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-27 15:20:15 +0100 |
commit | 4d64cbe4e531cb7be1061f1f54809d1d0a1b0061 (patch) | |
tree | 4eed3c0bb7009155a6e93aca0bf426445fcb759e /storage | |
parent | b8a721b2dfedd10ede9cf571fb3fb8060449b14a (diff) | |
parent | 91eb5d5e3a8ab31803569e0e5d68ff1d217d3853 (diff) |
Merge pull request #15041 from vespa-engine/toregge/add-unit-test-for-bucket-merge-with-partially-filled-diff-from-last-source-only-node
Add unit test for bucket merge with partially filled diff
Diffstat (limited to 'storage')
4 files changed, 268 insertions, 11 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index cda570a4396..7dad8260492 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -1089,7 +1089,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x7; - EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7)); + EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList)); EXPECT_TRUE(status.diff.empty()); } @@ -1105,7 +1105,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x6; - EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7)); + EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7, status.nodeList)); EXPECT_EQ(2, status.diff.size()); } @@ -1123,7 +1123,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x5; - EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7)); + EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList)); EXPECT_EQ(2, status.diff.size()); } } @@ -1175,4 +1175,186 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { EXPECT_TRUE(foundTimestamp); } +namespace { + +storage::api::GetBucketDiffCommand::Entry +make_entry(uint64_t timestamp, uint16_t mask) { + storage::api::GetBucketDiffCommand::Entry entry; + entry._timestamp = timestamp; + entry._gid = document::GlobalId(); + entry._headerSize = 0; + entry._bodySize = 0; + entry._flags = MergeHandler::StateFlag::IN_USE; + entry._hasMask = mask; + return entry; +} + +void +fill_entry(storage::api::ApplyBucketDiffCommand::Entry &e, const document::Document& doc, const document::DocumentTypeRepo &repo) +{ + e._docName = doc.getId().toString(); + vespalib::nbostream stream; + doc.serialize(stream); + e._headerBlob.resize(stream.size()); + memcpy(&e._headerBlob[0], stream.peek(), stream.size()); + e._repo = &repo; +} + +/* + * Helper class to check both timestamp and mask at once. + */ +struct EntryCheck +{ + uint64_t _timestamp; + uint16_t _hasMask; + + EntryCheck(uint64_t timestamp, uint16_t hasMask) + : _timestamp(timestamp), + _hasMask(hasMask) + { + } + bool operator==(const api::GetBucketDiffCommand::Entry &rhs) const { + return _timestamp == rhs._timestamp && _hasMask == rhs._hasMask; + } +}; + +std::ostream &operator<<(std::ostream &os, const EntryCheck &entry) +{ + os << "EntryCheck(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")"; + return os; +} + +} + +namespace api { + +std::ostream &operator<<(std::ostream &os, const MergeBucketCommand::Node &node) +{ + os << "Node(" << node.index << "," << (node.sourceOnly ? "true" : "false") << ")"; + return os; +} + +std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &entry) +{ + os << "Entry(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")"; + return os; +} + +} + +TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +{ + using NodeList = decltype(_nodes); + // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 + _nodes.clear(); + _nodes.emplace_back(0, false); + _nodes.emplace_back(1, false); + _nodes.emplace_back(2, true); + _nodes.emplace_back(3, true); + _nodes.emplace_back(4, true); + _maxTimestamp = 30000; // Extend timestamp range to include doc1 and doc2 + + auto doc1 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 1); + auto doc2 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 2); + + MergeHandler handler = createHandler(); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + cmd->setSourceIndex(1234); + MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); + ASSERT_EQ(1u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); + size_t baseline_diff_size = 0; + { + LOG(debug, "checking GetBucketDiff command"); + auto& cmd2 = dynamic_cast<api::GetBucketDiffCommand&>(*messageKeeper()._msgs[0]); + EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes())); + EXPECT_EQ(1, cmd2.getAddress()->getIndex()); + EXPECT_EQ(1234, cmd2.getSourceIndex()); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + baseline_diff_size = cmd2.getDiff().size(); + auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2); + auto &diff = reply->getDiff(); + // doc1 and doc2 is present on nodes 3 and 4. + diff.push_back(make_entry(20000, ((1 << 3) | (1 << 4)))); + diff.push_back(make_entry(20100, ((1 << 3) | (1 << 4)))); + EXPECT_EQ(baseline_diff_size + 2u, reply->getDiff().size()); + handler.handleGetBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "sent handleGetBucketDiffReply"); + } + ASSERT_EQ(2u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType()); + { + LOG(debug, "checking first ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + // Node 4 has been eliminated before the first ApplyBucketDiff command + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); + EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); + EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]); + auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); + // ApplyBucketDiffCommand has a shorter node list, node 2 is not present + EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto& diff = reply->getDiff(); + EXPECT_EQ(2u, diff.size()); + EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry); + EXPECT_EQ(EntryCheck(20100u, 4u), diff[1]._entry); + /* + * Only fill first diff entry to simulate max chunk size being exceeded + * when filling diff entries on source node (node 3). + */ + fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo()); + diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1. + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled first ApplyBucketDiffReply"); + } + ASSERT_EQ(3u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[2]->getType()); + { + LOG(debug, "checking second ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); + EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]); + auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); + auto& diff = reply->getDiff(); + EXPECT_EQ(1u, diff.size()); + EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); + fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); + diff[0]._entry._hasMask |= 2u; + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled second ApplyBucketDiffReply"); + } + ASSERT_EQ(4u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[3]->getType()); + { + LOG(debug, "checking third ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command + EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); + EXPECT_EQ(baseline_diff_size, s.diff.size()); + auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); + EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); + auto& diff = reply->getDiff(); + EXPECT_EQ(baseline_diff_size, diff.size()); + for (auto& e : diff) { + EXPECT_EQ(1u, e._entry._hasMask); + e._entry._hasMask |= 2u; + } + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled third ApplyBucketDiffReply"); + } + ASSERT_EQ(5u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType()); + LOG(debug, "got mergebucket reply"); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index 6934f4754bb..97d3da28abe 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -3,11 +3,70 @@ #include "mergestatus.h" #include <ostream> #include <vespa/log/log.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <cassert> LOG_SETUP(".mergestatus"); namespace storage { +namespace { + +/* + * Class for remapping bit masks from a partial set of nodes to a full + * set of nodes. + */ +class MaskRemapper +{ + std::vector<uint16_t> _mask_remap; + +public: + MaskRemapper(const std::vector<api::MergeBucketCommand::Node> &all_nodes, + const std::vector<api::MergeBucketCommand::Node> &nodes); + ~MaskRemapper(); + + uint16_t operator()(uint16_t mask) const; +}; + +MaskRemapper::MaskRemapper(const std::vector<api::MergeBucketCommand::Node> &all_nodes, + const std::vector<api::MergeBucketCommand::Node> &nodes) + : _mask_remap() +{ + if (nodes != all_nodes) { + vespalib::hash_map<uint32_t, uint32_t> node_index_to_mask(all_nodes.size()); + uint16_t mask = 1u; + for (const auto& node : all_nodes) { + node_index_to_mask[node.index] = mask; + mask <<= 1; + } + _mask_remap.reserve(nodes.size()); + for (const auto& node : nodes) { + mask = node_index_to_mask[node.index]; + assert(mask != 0u); + _mask_remap.push_back(mask); + } + } +} + +MaskRemapper::~MaskRemapper() = default; + +uint16_t +MaskRemapper::operator()(uint16_t mask) const +{ + if (!_mask_remap.empty()) { + uint16_t new_mask = 0u; + for (uint32_t i = 0u; i < _mask_remap.size(); ++i) { + if ((mask & (1u << i)) != 0u) { + new_mask |= _mask_remap[i]; + } + } + mask = new_mask; + } + return mask; +} + +} + MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType& lt, api::StorageMessage::Priority priority, uint32_t traceLevel) @@ -18,15 +77,21 @@ MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType& MergeStatus::~MergeStatus() = default; +/* + * Note: hasMask parameter and _entry._hasMask in part vector are per-reply masks, + * based on the nodes returned in the ApplyBucketDiffReply. + */ bool MergeStatus::removeFromDiff( const std::vector<api::ApplyBucketDiffCommand::Entry>& part, - uint16_t hasMask) + uint16_t hasMask, + const std::vector<api::MergeBucketCommand::Node> &nodes) { std::deque<api::GetBucketDiffCommand::Entry>::iterator it(diff.begin()); std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it2( part.begin()); bool altered = false; + MaskRemapper remap_mask(nodeList, nodes); // We expect part array to be sorted in the same order as in the diff, // and that all entries in the part should exist in the source list. while (it != diff.end() && it2 != part.end()) { @@ -62,11 +127,19 @@ MergeStatus::removeFromDiff( } it = diff.erase(it); altered = true; - } else if (it2->_entry._hasMask != it->_hasMask) { - // Hasmasks have changed, meaning bucket contents changed on - // one or more of the nodes during merging. - altered = true; - it->_hasMask = it2->_entry._hasMask; + } else { + /* + * Remap from per-reply mask for the ApplyBucketDiffReply to a + * per-merge-operation mask with same bit assignment as _hasMask in + * the diff vector. + */ + uint16_t mask = remap_mask(it2->_entry._hasMask); + if (mask != it->_hasMask) { + // Hasmasks have changed, meaning bucket contents changed on + // one or more of the nodes during merging. + altered = true; + it->_hasMask = mask; + } } ++it2; } diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index 93a4b96c1a0..18ced81c280 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -32,11 +32,13 @@ public: ~MergeStatus(); /** + * Note: hasMask parameter and _entry._hasMask in part vector are per-reply masks, + * based on the nodes returned in ApplyBucketDiffReply. * @return true if any entries were removed from the internal diff * or the two diffs had entries with mismatching hasmasks, which * indicates that bucket contents have changed during the merge. */ - bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask); + bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes); void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool isFirstNode() const { return (reply.get() != 0); } }; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index cabd74f6777..6e7fc30bd6c 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1355,7 +1355,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } const size_t diffSizeBefore = s.diff.size(); - const bool altered = s.removeFromDiff(diff, hasMask); + const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes()); if (reply.getResult().success() && s.diff.size() == diffSizeBefore && !altered) |