diff options
Diffstat (limited to 'storage')
4 files changed, 99 insertions, 54 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 02527883022..335863322d9 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -985,7 +985,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); - auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4); + auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x2); diffCmd->getDiff() = dummyDiff->getDiff(); api::GetBucketDiffReply diffReply(*diffCmd); @@ -1294,8 +1294,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) // Node 4 has been eliminated before the first ApplyBucketDiff command EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); - EXPECT_EQ(EntryCheck(20000, 8u), s.diff[baseline_diff_size]); - EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size + 1]); + EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); @@ -1321,15 +1321,15 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 8u), s.diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); - fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); - diff[0]._entry._hasMask |= 2u; + // Simulate that node 3 somehow lost doc2 when trying to fill diff entry. + diff[0]._entry._hasMask &= ~4u; handler.handleApplyBucketDiffReply(*reply, messageKeeper()); LOG(debug, "handled second ApplyBucketDiffReply"); } @@ -1341,7 +1341,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); - EXPECT_EQ(baseline_diff_size, s.diff.size()); + EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); @@ -1355,7 +1356,27 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) LOG(debug, "handled third ApplyBucketDiffReply"); } ASSERT_EQ(5u, messageKeeper()._msgs.size()); - ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[4]->getType()); + ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[4]->getType()); + { + LOG(debug, "checking fourth ApplyBucketDiff command"); + EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); + auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + // All nodes in use again due to failure to fill diff entry for doc2 + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + EXPECT_EQ(1u, s.diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); + auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); + auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6); + auto& diff = reply->getDiff(); + EXPECT_EQ(1u, diff.size()); + fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); + diff[0]._entry._hasMask |= 2u; + handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + LOG(debug, "handled fourth ApplyBucketDiffReply"); + } + ASSERT_EQ(6u, messageKeeper()._msgs.size()); + ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); } diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index 2ecef59b567..2e390db69be 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -12,7 +12,7 @@ namespace storage { MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType& lt, api::StorageMessage::Priority priority, uint32_t traceLevel) - : reply(), nodeList(), maxTimestamp(0), diff(), pendingId(0), + : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0), pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock), context(lt, priority, traceLevel) {} diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index 18ced81c280..51930f337c6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -18,6 +18,7 @@ public: using SP = std::shared_ptr<MergeStatus>; std::shared_ptr<api::StorageReply> reply; + std::vector<api::MergeBucketCommand::Node> full_node_list; std::vector<api::MergeBucketCommand::Node> nodeList; framework::MicroSecTime maxTimestamp; std::deque<api::GetBucketDiffCommand::Entry> diff; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 6e7fc30bd6c..51b575548d8 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -651,18 +651,22 @@ MergeHandler::applyDiffLocally( } namespace { - void findCandidates(MergeStatus& status, bool constrictHasMask, uint16_t hasMask, + void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask, uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) { uint32_t chunkSize = 0; for (const auto& entry : status.diff) { - if (constrictHasMask && entry._hasMask != hasMask) { + uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask); + if ((entry_has_mask == 0u) || + (constrictHasMask && (entry_has_mask != hasMask))) { continue; } chunkSize += entry._bodySize + entry._headerSize; cmd.getDiff().emplace_back(entry); if (constrictHasMask) { cmd.getDiff().back()._entry._hasMask = newHasMask; + } else { + cmd.getDiff().back()._entry._hasMask = entry_has_mask; } } } @@ -690,52 +694,70 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, LOG(spam, "Processing merge of %s. %u entries left to merge.", bucket.toString().c_str(), (uint32_t) status.diff.size()); std::shared_ptr<api::ApplyBucketDiffCommand> cmd; - - // If we still have a source only node, eliminate that one from the - // merge. - while (status.nodeList.back().sourceOnly) { - std::vector<api::MergeBucketCommand::Node> nodes; - for (const auto& node : status.nodeList) { - if (!node.sourceOnly) { - nodes.emplace_back(node); + std::map<uint16_t, uint32_t> counts; + + uint16_t active_nodes_mask; + do { + active_nodes_mask = (1u << status.nodeList.size()) - 1; + // If we still have a source only node, eliminate that one from the + // merge. + while (status.nodeList.back().sourceOnly) { + std::vector<api::MergeBucketCommand::Node> nodes; + for (const auto& node : status.nodeList) { + if (!node.sourceOnly) { + nodes.emplace_back(node); + } + } + nodes.push_back(status.nodeList.back()); + assert(nodes.size() > 1); + + cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); + cmd->setAddress(createAddress(_clusterName, nodes[1].index)); + findCandidates(status, + active_nodes_mask, + true, + 1 << (status.nodeList.size() - 1), + 1 << (nodes.size() - 1), + *cmd); + if (cmd->getDiff().size() != 0) { + break; + } + cmd.reset(); + // If we found no data to merge from the last source only node, + // remove it and retry. + status.nodeList.pop_back(); + active_nodes_mask = (1u << status.nodeList.size()) - 1; + // If only one node left in the merge, return ok. + if (status.nodeList.size() == 1) { + LOG(debug, "Done with merge of %s as there is only one node " + "that is not source only left in the merge.", + bucket.toString().c_str()); + return status.reply; } } - nodes.push_back(status.nodeList.back()); - assert(nodes.size() > 1); - - cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); - cmd->setAddress(createAddress(_clusterName, nodes[1].index)); - findCandidates(status, - true, - 1 << (status.nodeList.size() - 1), - 1 << (nodes.size() - 1), - *cmd); - if (cmd->getDiff().size() != 0) break; - cmd.reset(); - // If we found no data to merge from the last source only node, - // remove it and retry. (Clear it out of the hasmask such that we - // can match hasmask with operator==) - status.nodeList.pop_back(); - uint16_t mask = ~(1 << status.nodeList.size()); - for (auto& e : status.diff) { - e._hasMask &= mask; - } - // If only one node left in the merge, return ok. - if (status.nodeList.size() == 1) { - LOG(debug, "Done with merge of %s as there is only one node " - "that is not source only left in the merge.", - bucket.toString().c_str()); - return status.reply; + if (!cmd) { + // If we did not have a source only node, check if we have a path with + // many documents within it that we'll merge separately + counts.clear(); + for (const auto& e : status.diff) { + ++counts[e._hasMask & active_nodes_mask]; + } + if (counts.size() == 1 && + counts.begin()->first == 0u && + status.nodeList.size() < status.full_node_list.size()) { + // Diff not empty, but none of the remaining nodes have any merge entries. + // Bring back source only nodes that might still have merge entries. + status.nodeList = status.full_node_list; + continue; + } } - } - // If we did not have a source only node, check if we have a path with - // many documents within it that we'll merge separately + break; + } while (true); if (!cmd) { - std::map<uint16_t, uint32_t> counts; - for (const auto& e : status.diff) { - ++counts[e._hasMask]; - } for (const auto& e : counts) { + if (e.first == 0u) { + continue; + } if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize) || counts.size() == 1) { @@ -769,7 +791,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd->setAddress(createAddress(_clusterName, nodes[1].index)); // Add all the metadata, and thus use big limit. Max // data to fetch parameter will control amount added. - findCandidates(status, true, e.first, newMask, *cmd); + findCandidates(status, active_nodes_mask, true, e.first, newMask, *cmd); break; } } @@ -780,7 +802,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if ( ! cmd ) { cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList); cmd->setAddress(createAddress(_clusterName, status.nodeList[1].index)); - findCandidates(status, false, 0, 0, *cmd); + findCandidates(status, active_nodes_mask, false, 0, 0, *cmd); } cmd->setPriority(status.context.getPriority()); cmd->setTimeout(status.timeout); @@ -868,6 +890,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP _clock, cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); + s->full_node_list = cmd.getNodes(); s->nodeList = cmd.getNodes(); s->maxTimestamp = Timestamp(cmd.getMaxTimestamp()); s->timeout = cmd.getTimeout(); |