summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-10-29 15:09:10 +0100
committerTor Egge <Tor.Egge@broadpark.no>2020-10-29 15:09:10 +0100
commit1554c84cdfb35d3d30e4bbb8e23eed021c83e060 (patch)
tree16ac5f6203953dca3a9b6b4ce51745aad613a810 /storage/src
parent6fe6ef140d3ce55dd74baa4df8761b9c05b2832d (diff)
Use source-only nodes again during bucket merge if hasMask histogram shows
that some diff entries are unavailable on all other nodes.
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp37
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp113
4 files changed, 99 insertions, 54 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 02527883022..335863322d9 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -985,7 +985,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
- auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4);
+ auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x2);
diffCmd->getDiff() = dummyDiff->getDiff();
api::GetBucketDiffReply diffReply(*diffCmd);
@@ -1294,8 +1294,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
// Node 4 has been eliminated before the first ApplyBucketDiff command
EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
EXPECT_EQ(baseline_diff_size + 2u, s.diff.size());
- EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]);
- EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]);
+ EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]);
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
// ApplyBucketDiffCommand has a shorter node list, node 2 is not present
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
@@ -1321,15 +1321,15 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]);
auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
- fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
- diff[0]._entry._hasMask |= 2u;
+ // Simulate that node 3 somehow lost doc2 when trying to fill diff entry.
+ diff[0]._entry._hasMask &= ~4u;
handler.handleApplyBucketDiffReply(*reply, messageKeeper());
LOG(debug, "handled second ApplyBucketDiffReply");
}
@@ -1341,7 +1341,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command
EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size, s.diff.size());
+ EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]);
auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
@@ -1355,7 +1356,27 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
LOG(debug, "handled third ApplyBucketDiffReply");
}
ASSERT_EQ(5u, messageKeeper()._msgs.size());
- ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType());
+ ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[4]->getType());
+ {
+ LOG(debug, "checking fourth ApplyBucketDiff command");
+ EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
+ auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ // All nodes in use again due to failure to fill diff entry for doc2
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
+ EXPECT_EQ(1u, s.diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]);
+ auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes());
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6);
+ auto& diff = reply->getDiff();
+ EXPECT_EQ(1u, diff.size());
+ fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
+ diff[0]._entry._hasMask |= 2u;
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ LOG(debug, "handled fourth ApplyBucketDiffReply");
+ }
+ ASSERT_EQ(6u, messageKeeper()._msgs.size());
+ ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index 2ecef59b567..2e390db69be 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -12,7 +12,7 @@ namespace storage {
MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType& lt,
api::StorageMessage::Priority priority,
uint32_t traceLevel)
- : reply(), nodeList(), maxTimestamp(0), diff(), pendingId(0),
+ : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0),
pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock),
context(lt, priority, traceLevel)
{}
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index 18ced81c280..51930f337c6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -18,6 +18,7 @@ public:
using SP = std::shared_ptr<MergeStatus>;
std::shared_ptr<api::StorageReply> reply;
+ std::vector<api::MergeBucketCommand::Node> full_node_list;
std::vector<api::MergeBucketCommand::Node> nodeList;
framework::MicroSecTime maxTimestamp;
std::deque<api::GetBucketDiffCommand::Entry> diff;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 6e7fc30bd6c..51b575548d8 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -651,18 +651,22 @@ MergeHandler::applyDiffLocally(
}
namespace {
- void findCandidates(MergeStatus& status, bool constrictHasMask, uint16_t hasMask,
+ void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
{
uint32_t chunkSize = 0;
for (const auto& entry : status.diff) {
- if (constrictHasMask && entry._hasMask != hasMask) {
+ uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
+ if ((entry_has_mask == 0u) ||
+ (constrictHasMask && (entry_has_mask != hasMask))) {
continue;
}
chunkSize += entry._bodySize + entry._headerSize;
cmd.getDiff().emplace_back(entry);
if (constrictHasMask) {
cmd.getDiff().back()._entry._hasMask = newHasMask;
+ } else {
+ cmd.getDiff().back()._entry._hasMask = entry_has_mask;
}
}
}
@@ -690,52 +694,70 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
LOG(spam, "Processing merge of %s. %u entries left to merge.",
bucket.toString().c_str(), (uint32_t) status.diff.size());
std::shared_ptr<api::ApplyBucketDiffCommand> cmd;
-
- // If we still have a source only node, eliminate that one from the
- // merge.
- while (status.nodeList.back().sourceOnly) {
- std::vector<api::MergeBucketCommand::Node> nodes;
- for (const auto& node : status.nodeList) {
- if (!node.sourceOnly) {
- nodes.emplace_back(node);
+ std::map<uint16_t, uint32_t> counts;
+
+ uint16_t active_nodes_mask;
+ do {
+ active_nodes_mask = (1u << status.nodeList.size()) - 1;
+ // If we still have a source only node, eliminate that one from the
+ // merge.
+ while (status.nodeList.back().sourceOnly) {
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ for (const auto& node : status.nodeList) {
+ if (!node.sourceOnly) {
+ nodes.emplace_back(node);
+ }
+ }
+ nodes.push_back(status.nodeList.back());
+ assert(nodes.size() > 1);
+
+ cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
+ cmd->setAddress(createAddress(_clusterName, nodes[1].index));
+ findCandidates(status,
+ active_nodes_mask,
+ true,
+ 1 << (status.nodeList.size() - 1),
+ 1 << (nodes.size() - 1),
+ *cmd);
+ if (cmd->getDiff().size() != 0) {
+ break;
+ }
+ cmd.reset();
+ // If we found no data to merge from the last source only node,
+ // remove it and retry.
+ status.nodeList.pop_back();
+ active_nodes_mask = (1u << status.nodeList.size()) - 1;
+ // If only one node left in the merge, return ok.
+ if (status.nodeList.size() == 1) {
+ LOG(debug, "Done with merge of %s as there is only one node "
+ "that is not source only left in the merge.",
+ bucket.toString().c_str());
+ return status.reply;
}
}
- nodes.push_back(status.nodeList.back());
- assert(nodes.size() > 1);
-
- cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
- cmd->setAddress(createAddress(_clusterName, nodes[1].index));
- findCandidates(status,
- true,
- 1 << (status.nodeList.size() - 1),
- 1 << (nodes.size() - 1),
- *cmd);
- if (cmd->getDiff().size() != 0) break;
- cmd.reset();
- // If we found no data to merge from the last source only node,
- // remove it and retry. (Clear it out of the hasmask such that we
- // can match hasmask with operator==)
- status.nodeList.pop_back();
- uint16_t mask = ~(1 << status.nodeList.size());
- for (auto& e : status.diff) {
- e._hasMask &= mask;
- }
- // If only one node left in the merge, return ok.
- if (status.nodeList.size() == 1) {
- LOG(debug, "Done with merge of %s as there is only one node "
- "that is not source only left in the merge.",
- bucket.toString().c_str());
- return status.reply;
+ if (!cmd) {
+ // If we did not have a source only node, check if we have a path with
+ // many documents within it that we'll merge separately
+ counts.clear();
+ for (const auto& e : status.diff) {
+ ++counts[e._hasMask & active_nodes_mask];
+ }
+ if (counts.size() == 1 &&
+ counts.begin()->first == 0u &&
+ status.nodeList.size() < status.full_node_list.size()) {
+ // Diff not empty, but none of the remaining nodes have any merge entries.
+ // Bring back source only nodes that might still have merge entries.
+ status.nodeList = status.full_node_list;
+ continue;
+ }
}
- }
- // If we did not have a source only node, check if we have a path with
- // many documents within it that we'll merge separately
+ break;
+ } while (true);
if (!cmd) {
- std::map<uint16_t, uint32_t> counts;
- for (const auto& e : status.diff) {
- ++counts[e._hasMask];
- }
for (const auto& e : counts) {
+ if (e.first == 0u) {
+ continue;
+ }
if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize)
|| counts.size() == 1)
{
@@ -769,7 +791,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
cmd->setAddress(createAddress(_clusterName, nodes[1].index));
// Add all the metadata, and thus use big limit. Max
// data to fetch parameter will control amount added.
- findCandidates(status, true, e.first, newMask, *cmd);
+ findCandidates(status, active_nodes_mask, true, e.first, newMask, *cmd);
break;
}
}
@@ -780,7 +802,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
if ( ! cmd ) {
cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList);
cmd->setAddress(createAddress(_clusterName, status.nodeList[1].index));
- findCandidates(status, false, 0, 0, *cmd);
+ findCandidates(status, active_nodes_mask, false, 0, 0, *cmd);
}
cmd->setPriority(status.context.getPriority());
cmd->setTimeout(status.timeout);
@@ -868,6 +890,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
_clock, cmd.getLoadType(),
cmd.getPriority(), cmd.getTrace().getLevel());
_env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
+ s->full_node_list = cmd.getNodes();
s->nodeList = cmd.getNodes();
s->maxTimestamp = Timestamp(cmd.getMaxTimestamp());
s->timeout = cmd.getTimeout();