summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-10-27 15:20:15 +0100
committerGitHub <noreply@github.com>2020-10-27 15:20:15 +0100
commit4d64cbe4e531cb7be1061f1f54809d1d0a1b0061 (patch)
tree4eed3c0bb7009155a6e93aca0bf426445fcb759e /storage
parentb8a721b2dfedd10ede9cf571fb3fb8060449b14a (diff)
parent91eb5d5e3a8ab31803569e0e5d68ff1d217d3853 (diff)
Merge pull request #15041 from vespa-engine/toregge/add-unit-test-for-bucket-merge-with-partially-filled-diff-from-last-source-only-node
Add unit test for bucket merge with partially filled diff
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp188
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp85
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h4
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp2
4 files changed, 268 insertions, 11 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index cda570a4396..7dad8260492 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -1089,7 +1089,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
applyDiff[1]._entry._flags = 0x3;
applyDiff[1]._entry._hasMask = 0x7;
- EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7));
+ EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList));
EXPECT_TRUE(status.diff.empty());
}
@@ -1105,7 +1105,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
applyDiff[1]._entry._flags = 0x3;
applyDiff[1]._entry._hasMask = 0x6;
- EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7));
+ EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7, status.nodeList));
EXPECT_EQ(2, status.diff.size());
}
@@ -1123,7 +1123,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
applyDiff[1]._entry._flags = 0x3;
applyDiff[1]._entry._hasMask = 0x5;
- EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7));
+ EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7, status.nodeList));
EXPECT_EQ(2, status.diff.size());
}
}
@@ -1175,4 +1175,186 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
EXPECT_TRUE(foundTimestamp);
}
+namespace {
+
+storage::api::GetBucketDiffCommand::Entry
+make_entry(uint64_t timestamp, uint16_t mask) {
+ storage::api::GetBucketDiffCommand::Entry entry;
+ entry._timestamp = timestamp;
+ entry._gid = document::GlobalId();
+ entry._headerSize = 0;
+ entry._bodySize = 0;
+ entry._flags = MergeHandler::StateFlag::IN_USE;
+ entry._hasMask = mask;
+ return entry;
+}
+
+void
+fill_entry(storage::api::ApplyBucketDiffCommand::Entry &e, const document::Document& doc, const document::DocumentTypeRepo &repo)
+{
+ e._docName = doc.getId().toString();
+ vespalib::nbostream stream;
+ doc.serialize(stream);
+ e._headerBlob.resize(stream.size());
+ memcpy(&e._headerBlob[0], stream.peek(), stream.size());
+ e._repo = &repo;
+}
+
+/*
+ * Helper class to check both timestamp and mask at once.
+ */
+struct EntryCheck
+{
+ uint64_t _timestamp;
+ uint16_t _hasMask;
+
+ EntryCheck(uint64_t timestamp, uint16_t hasMask)
+ : _timestamp(timestamp),
+ _hasMask(hasMask)
+ {
+ }
+ bool operator==(const api::GetBucketDiffCommand::Entry &rhs) const {
+ return _timestamp == rhs._timestamp && _hasMask == rhs._hasMask;
+ }
+};
+
+std::ostream &operator<<(std::ostream &os, const EntryCheck &entry)
+{
+ os << "EntryCheck(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")";
+ return os;
+}
+
+}
+
+namespace api {
+
+std::ostream &operator<<(std::ostream &os, const MergeBucketCommand::Node &node)
+{
+ os << "Node(" << node.index << "," << (node.sourceOnly ? "true" : "false") << ")";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &entry)
+{
+ os << "Entry(timestamp=" << entry._timestamp << ", hasMask=" << entry._hasMask << ")";
+ return os;
+}
+
+}
+
+TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+{
+ using NodeList = decltype(_nodes);
+ // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
+ _nodes.clear();
+ _nodes.emplace_back(0, false);
+ _nodes.emplace_back(1, false);
+ _nodes.emplace_back(2, true);
+ _nodes.emplace_back(3, true);
+ _nodes.emplace_back(4, true);
+ _maxTimestamp = 30000; // Extend timestamp range to include doc1 and doc2
+
+ auto doc1 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 1);
+ auto doc2 = _env->_testDocMan.createRandomDocumentAtLocation(_location, 2);
+
+ MergeHandler handler = createHandler();
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ cmd->setSourceIndex(1234);
+ MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
+ ASSERT_EQ(1u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType());
+ size_t baseline_diff_size = 0;
+ {
+ LOG(debug, "checking GetBucketDiff command");
+ auto& cmd2 = dynamic_cast<api::GetBucketDiffCommand&>(*messageKeeper()._msgs[0]);
+ EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes()));
+ EXPECT_EQ(1, cmd2.getAddress()->getIndex());
+ EXPECT_EQ(1234, cmd2.getSourceIndex());
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
+ baseline_diff_size = cmd2.getDiff().size();
+ auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2);
+ auto &diff = reply->getDiff();
+ // doc1 and doc2 is present on nodes 3 and 4.
+ diff.push_back(make_entry(20000, ((1 << 3) | (1 << 4))));
+ diff.push_back(make_entry(20100, ((1 << 3) | (1 << 4))));
+ EXPECT_EQ(baseline_diff_size + 2u, reply->getDiff().size());
+ handler.handleGetBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "sent handleGetBucketDiffReply");
+ }
+ ASSERT_EQ(2u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType());
+ {
+ LOG(debug, "checking first ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ // Node 4 has been eliminated before the first ApplyBucketDiff command
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
+ EXPECT_EQ(baseline_diff_size + 2u, s.diff.size());
+ EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]);
+ auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
+ // ApplyBucketDiffCommand has a shorter node list, node 2 is not present
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(2u, diff.size());
+ EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry);
+ EXPECT_EQ(EntryCheck(20100u, 4u), diff[1]._entry);
+ /*
+ * Only fill first diff entry to simulate max chunk size being exceeded
+ * when filling diff entries on source node (node 3).
+ */
+ fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo());
+ diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1.
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled first ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(3u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[2]->getType());
+ {
+ LOG(debug, "checking second ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
+ EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]);
+ auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(1u, diff.size());
+ EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
+ fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
+ diff[0]._entry._hasMask |= 2u;
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled second ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(4u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[3]->getType());
+ {
+ LOG(debug, "checking third ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList);
+ EXPECT_EQ(baseline_diff_size, s.diff.size());
+ auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(baseline_diff_size, diff.size());
+ for (auto& e : diff) {
+ EXPECT_EQ(1u, e._entry._hasMask);
+ e._entry._hasMask |= 2u;
+ }
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled third ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(5u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType());
+ LOG(debug, "got mergebucket reply");
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index 6934f4754bb..97d3da28abe 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -3,11 +3,70 @@
#include "mergestatus.h"
#include <ostream>
#include <vespa/log/log.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <cassert>
LOG_SETUP(".mergestatus");
namespace storage {
+namespace {
+
+/*
+ * Class for remapping bit masks from a partial set of nodes to a full
+ * set of nodes.
+ */
+class MaskRemapper
+{
+ std::vector<uint16_t> _mask_remap;
+
+public:
+ MaskRemapper(const std::vector<api::MergeBucketCommand::Node> &all_nodes,
+ const std::vector<api::MergeBucketCommand::Node> &nodes);
+ ~MaskRemapper();
+
+ uint16_t operator()(uint16_t mask) const;
+};
+
+MaskRemapper::MaskRemapper(const std::vector<api::MergeBucketCommand::Node> &all_nodes,
+ const std::vector<api::MergeBucketCommand::Node> &nodes)
+ : _mask_remap()
+{
+ if (nodes != all_nodes) {
+ vespalib::hash_map<uint32_t, uint32_t> node_index_to_mask(all_nodes.size());
+ uint16_t mask = 1u;
+ for (const auto& node : all_nodes) {
+ node_index_to_mask[node.index] = mask;
+ mask <<= 1;
+ }
+ _mask_remap.reserve(nodes.size());
+ for (const auto& node : nodes) {
+ mask = node_index_to_mask[node.index];
+ assert(mask != 0u);
+ _mask_remap.push_back(mask);
+ }
+ }
+}
+
+MaskRemapper::~MaskRemapper() = default;
+
+uint16_t
+MaskRemapper::operator()(uint16_t mask) const
+{
+ if (!_mask_remap.empty()) {
+ uint16_t new_mask = 0u;
+ for (uint32_t i = 0u; i < _mask_remap.size(); ++i) {
+ if ((mask & (1u << i)) != 0u) {
+ new_mask |= _mask_remap[i];
+ }
+ }
+ mask = new_mask;
+ }
+ return mask;
+}
+
+}
+
MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType& lt,
api::StorageMessage::Priority priority,
uint32_t traceLevel)
@@ -18,15 +77,21 @@ MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType&
MergeStatus::~MergeStatus() = default;
+/*
+ * Note: hasMask parameter and _entry._hasMask in part vector are per-reply masks,
+ * based on the nodes returned in the ApplyBucketDiffReply.
+ */
bool
MergeStatus::removeFromDiff(
const std::vector<api::ApplyBucketDiffCommand::Entry>& part,
- uint16_t hasMask)
+ uint16_t hasMask,
+ const std::vector<api::MergeBucketCommand::Node> &nodes)
{
std::deque<api::GetBucketDiffCommand::Entry>::iterator it(diff.begin());
std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it2(
part.begin());
bool altered = false;
+ MaskRemapper remap_mask(nodeList, nodes);
// We expect part array to be sorted in the same order as in the diff,
// and that all entries in the part should exist in the source list.
while (it != diff.end() && it2 != part.end()) {
@@ -62,11 +127,19 @@ MergeStatus::removeFromDiff(
}
it = diff.erase(it);
altered = true;
- } else if (it2->_entry._hasMask != it->_hasMask) {
- // Hasmasks have changed, meaning bucket contents changed on
- // one or more of the nodes during merging.
- altered = true;
- it->_hasMask = it2->_entry._hasMask;
+ } else {
+ /*
+ * Remap from per-reply mask for the ApplyBucketDiffReply to a
+ * per-merge-operation mask with same bit assignment as _hasMask in
+ * the diff vector.
+ */
+ uint16_t mask = remap_mask(it2->_entry._hasMask);
+ if (mask != it->_hasMask) {
+ // Hasmasks have changed, meaning bucket contents changed on
+ // one or more of the nodes during merging.
+ altered = true;
+ it->_hasMask = mask;
+ }
}
++it2;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index 93a4b96c1a0..18ced81c280 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -32,11 +32,13 @@ public:
~MergeStatus();
/**
+ * Note: hasMask parameter and _entry._hasMask in part vector are per-reply masks,
+ * based on the nodes returned in ApplyBucketDiffReply.
* @return true if any entries were removed from the internal diff
* or the two diffs had entries with mismatching hasmasks, which
* indicates that bucket contents have changed during the merge.
*/
- bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask);
+ bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes);
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
bool isFirstNode() const { return (reply.get() != 0); }
};
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index cabd74f6777..6e7fc30bd6c 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -1355,7 +1355,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
const size_t diffSizeBefore = s.diff.size();
- const bool altered = s.removeFromDiff(diff, hasMask);
+ const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes());
if (reply.getResult().success()
&& s.diff.size() == diffSizeBefore
&& !altered)