diff options
-rw-r--r-- | storage/src/vespa/storage/persistence/mergehandler.cpp | 199 |
1 files changed, 89 insertions, 110 deletions
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 612d4545a8a..4d06c28078d 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -203,7 +203,7 @@ MergeHandler::buildBucketInfoList( if (dbInfo.valid()) { LOG(warning, "Prior to merging %s we found that storage " "bucket database was out of sync with content " - "of file. Actual file content is %s while " + "of bucket. Actual bucket content is %s while " "bucket database content was %s. Updating" " bucket database to get in sync.", bucket.toString().c_str(), @@ -222,17 +222,16 @@ MergeHandler::buildBucketInfoList( std::vector<spi::DocEntry::UP> entries; populateMetaData(bucket, maxTimestamp, entries, context); - for (size_t i = 0; i < entries.size(); ++i) { + for (const auto& entry : entries) { api::GetBucketDiffCommand::Entry diff; - const spi::DocEntry& entry(*entries[i]); diff._gid = GlobalId(); // We do not know doc sizes at this point, so just set to 0 diff._headerSize = 0; diff._bodySize = 0; - diff._timestamp = entry.getTimestamp(); + diff._timestamp = entry->getTimestamp(); diff._flags = IN_USE - | (entry.isRemove() ? DELETED : 0); - diff._hasMask = 1 << myNodeIndex; + | (entry->isRemove() ? DELETED : 0); + diff._hasMask = 1U << myNodeIndex; output.push_back(diff); LOG(spam, "bucket info list of %s: Adding entry %s to diff", @@ -256,11 +255,11 @@ namespace { if (!forwards && nodeIndex == 0) return false; uint32_t result = 1 << nodeIndex; uint32_t mask = 3 << (forwards ? nodeIndex : nodeIndex-1); - for (std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it - = diff.begin(); it != diff.end(); ++it) - { - if (it->filled()) continue; - if ((it->_entry._hasMask & mask) == result) { + for (const auto& e : diff) { + if (e.filled()) { + continue; + } + if ((e._entry._hasMask & mask) == result) { return true; } } @@ -277,31 +276,24 @@ namespace { { uint32_t nodeMask = 1 << nodeIndex; bool foundEntries = false; - for (std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it - = diff.begin(); it != diff.end(); ++it) - { + for (const auto& e : diff) { // Ignore entries we don't need locally - if ((it->_entry._hasMask & nodeMask) != 0) continue; + if ((e._entry._hasMask & nodeMask) != 0) { + continue; + } foundEntries = true; - if (it->filled()) return true; + if (e.filled()) { + return true; + } } if (foundEntries) { - LOG(spam, "Merge(): Found entries needed, but they don't contain " - "data"); + LOG(spam, "Merge(): Found entries needed, but they don't contain data"); } return false; } - int - countUnfilledEntries(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) - { - int count = 0; - - for (uint32_t i=0, n=diff.size(); i<n; ++i) { - if (!diff[i].filled()) count++; - } - - return count; + int countUnfilledEntries(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) { + return std::count_if(diff.begin(), diff.end(), [](auto& e) { return !e.filled(); }); }; /** @@ -315,8 +307,7 @@ namespace { return value; } - api::StorageMessageAddress createAddress(const std::string& clusterName, uint16_t node) - { + api::StorageMessageAddress createAddress(const std::string& clusterName, uint16_t node) { return api::StorageMessageAddress(clusterName, lib::NodeType::STORAGE, node); } @@ -351,10 +342,9 @@ MergeHandler::fetchLocalData( // Preload documents in memory std::vector<spi::Timestamp> slots; uint32_t alreadyFilled = 0; - for (uint32_t i=0, n=diff.size(); i<n; ++i) { - api::ApplyBucketDiffCommand::Entry& e(diff[i]); + for (const auto& e : diff) { if ((e._entry._hasMask & nodeMask) != 0 && !e.filled()) { - slots.push_back(spi::Timestamp(e._entry._timestamp)); + slots.emplace_back(e._entry._timestamp); } if (e.filled()) { alreadyFilled += e._headerBlob.size() + e._bodyBlob.size(); @@ -406,18 +396,18 @@ MergeHandler::fetchLocalData( throw std::runtime_error(ss.str()); } auto list = result.steal_entries(); - for (size_t i = 0; i < list.size(); ++i) { - if (list[i]->getSize() <= remainingSize + for (auto& entry : list) { + if (entry->getSize() <= remainingSize || (entries.empty() && alreadyFilled == 0)) { - remainingSize -= list[i]->getSize(); + remainingSize -= entry->getSize(); LOG(spam, "Added %s, remainingSize is %u", entries.back()->toString().c_str(), remainingSize); - entries.push_back(std::move(list[i])); + entries.push_back(std::move(entry)); } else { LOG(spam, "Adding %s would exceed chunk size limit of %u; " "not filling up any more diffs for current round", - list[i]->toString().c_str(), _maxChunkSize); + entry->toString().c_str(), _maxChunkSize); chunkLimitReached = true; break; } @@ -432,22 +422,20 @@ MergeHandler::fetchLocalData( document::BucketIdFactory idFactory; - for (size_t i=0; i<entries.size(); ++i) { - const spi::DocEntry& docEntry(*entries[i]); - LOG(spam, "fetchLocalData: processing %s", - docEntry.toString().c_str()); + for (const auto& entry_ptr : entries) { + const auto& docEntry = *entry_ptr; + LOG(spam, "fetchLocalData: processing %s", docEntry.toString().c_str()); - std::vector<api::ApplyBucketDiffCommand::Entry>::iterator iter( - std::lower_bound(diff.begin(), diff.end(), - api::Timestamp(docEntry.getTimestamp()), - DiffEntryTimestampPredicate())); + auto iter = std::lower_bound(diff.begin(), diff.end(), + api::Timestamp(docEntry.getTimestamp()), + DiffEntryTimestampPredicate()); assert(iter != diff.end()); assert(iter->_entry._timestamp == docEntry.getTimestamp()); api::ApplyBucketDiffCommand::Entry& e(*iter); if (!docEntry.isRemove()) { const Document* doc = docEntry.getDocument(); - assert(doc != 0); + assert(doc != nullptr); assertContainedInBucket(doc->getId(), bucket, idFactory); e._docName = doc->getId().toString(); vespalib::nbostream stream; @@ -457,7 +445,7 @@ MergeHandler::fetchLocalData( e._bodyBlob.clear(); } else { const DocumentId* docId = docEntry.getDocumentId(); - assert(docId != 0); + assert(docId != nullptr); assertContainedInBucket(*docId, bucket, idFactory); if (e._entry._flags & DELETED) { e._docName = docId->toString(); @@ -470,8 +458,7 @@ MergeHandler::fetchLocalData( e._repo = _env._repo.get(); } - for (size_t i=0; i<diff.size(); ++i) { - api::ApplyBucketDiffCommand::Entry& e(diff[i]); + for (auto& e : diff) { if ((e._entry._hasMask & nodeMask) == 0 || e.filled()) { continue; } @@ -587,8 +574,7 @@ MergeHandler::applyDiffLocally( bucket.toString().c_str(), e.toString().c_str()); applyDiffEntry(bucket, e, context, *repo); } else { - assert(spi::Timestamp(e._entry._timestamp) - == existing.getTimestamp()); + assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put // dupes (which is a common case) or the new entry should be an // unrevertable remove. @@ -671,24 +657,22 @@ namespace { uint32_t maxSize, api::ApplyBucketDiffCommand& cmd) { uint32_t chunkSize = 0; - for (std::deque<api::GetBucketDiffCommand::Entry>::const_iterator it - = status.diff.begin(); it != status.diff.end(); ++it) - { - if (constrictHasMask && it->_hasMask != hasMask) { + for (const auto& entry : status.diff) { + if (constrictHasMask && entry._hasMask != hasMask) { continue; } if (chunkSize != 0 && - chunkSize + it->_bodySize + it->_headerSize > maxSize) + chunkSize + entry._bodySize + entry._headerSize > maxSize) { LOG(spam, "Merge of %s used %d bytes, max is %d. Will " "fetch in next merge round.", id.toString().c_str(), - chunkSize + it->_bodySize + it->_headerSize, + chunkSize + entry._bodySize + entry._headerSize, maxSize); break; } - chunkSize += it->_bodySize + it->_headerSize; - cmd.getDiff().push_back(api::ApplyBucketDiffCommand::Entry(*it)); + chunkSize += entry._bodySize + entry._headerSize; + cmd.getDiff().emplace_back(entry); if (constrictHasMask) { cmd.getDiff().back()._entry._hasMask = newHasMask; } @@ -710,7 +694,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, } // If nothing to update, we're done. - if (status.diff.size() == 0) { + if (status.diff.empty()) { LOG(debug, "Done with merge of %s. No more entries in diff.", bucket.toString().c_str()); return status.reply; } @@ -723,9 +707,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // merge. while (status.nodeList.back().sourceOnly) { std::vector<api::MergeBucketCommand::Node> nodes; - for (uint16_t i=0; i<status.nodeList.size(); ++i) { - if (!status.nodeList[i].sourceOnly) { - nodes.push_back(status.nodeList[i]); + for (const auto& node : status.nodeList) { + if (!node.sourceOnly) { + nodes.emplace_back(node); } } nodes.push_back(status.nodeList.back()); @@ -735,7 +719,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // data to fetch parameter will control amount added. uint32_t maxSize = (_env._config.enableMergeLocalNodeChooseDocsOptimalization - ? std::numeric_limits<uint32_t>().max() + ? std::numeric_limits<uint32_t>::max() : _maxChunkSize); cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize); @@ -749,17 +733,15 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, *cmd); if (cmd->getDiff().size() != 0) break; cmd.reset(); - // If we found no data to merge from the last source only node, - // remove it and retry. (Clear it out of the hasmask such that we - // can match hasmask with operator==) + // If we found no data to merge from the last source only node, + // remove it and retry. (Clear it out of the hasmask such that we + // can match hasmask with operator==) status.nodeList.pop_back(); uint16_t mask = ~(1 << status.nodeList.size()); - for (std::deque<api::GetBucketDiffCommand::Entry>::iterator it - = status.diff.begin(); it != status.diff.end(); ++it) - { - it->_hasMask &= mask; + for (auto& e : status.diff) { + e._hasMask &= mask; } - // If only one node left in the merge, return ok. + // If only one node left in the merge, return ok. if (status.nodeList.size() == 1) { LOG(debug, "Done with merge of %s as there is only one node " "that is not source only left in the merge.", @@ -767,40 +749,36 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, return status.reply; } } - // If we did not have a source only node, check if we have a path with - // many documents within it that we'll merge separately - if (cmd.get() == 0) { + // If we did not have a source only node, check if we have a path with + // many documents within it that we'll merge separately + if (!cmd) { std::map<uint16_t, uint32_t> counts; - for (std::deque<api::GetBucketDiffCommand::Entry>::const_iterator it - = status.diff.begin(); it != status.diff.end(); ++it) - { - ++counts[it->_hasMask]; + for (const auto& e : status.diff) { + ++counts[e._hasMask]; } - for (std::map<uint16_t, uint32_t>::const_iterator it = counts.begin(); - it != counts.end(); ++it) - { - if (it->second >= uint32_t(_env._config.commonMergeChainOptimalizationMinimumSize) + for (const auto& e : counts) { + if (e.second >= uint32_t(_env._config.commonMergeChainOptimalizationMinimumSize) || counts.size() == 1) { LOG(spam, "Sending separate apply bucket diff for path %x " "with size %u", - it->first, it->second); + e.first, e.second); std::vector<api::MergeBucketCommand::Node> nodes; - // This node always has to be first in chain. + // This node always has to be first in chain. nodes.push_back(status.nodeList[0]); - // Add all the nodes that lack the docs in question - for (uint16_t i=1; i<status.nodeList.size(); ++i) { - if ((it->first & (1 << i)) == 0) { + // Add all the nodes that lack the docs in question + for (uint16_t i = 1; i < status.nodeList.size(); ++i) { + if ((e.first & (1 << i)) == 0) { nodes.push_back(status.nodeList[i]); } } uint16_t newMask = 1; - // If this node doesn't have the docs, add a node that has - // them to the end of the chain, so the data is applied - // going back. - if ((it->first & 1) == 0) { + // If this node doesn't have the docs, add a node that has + // them to the end of the chain, so the data is applied + // going back. + if ((e.first & 1) == 0) { for (uint16_t i=1; i<status.nodeList.size(); ++i) { - if ((it->first & (1 << i)) != 0) { + if ((e.first & (1 << i)) != 0) { nodes.push_back(status.nodeList[i]); break; } @@ -810,13 +788,13 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, assert(nodes.size() > 1); uint32_t maxSize = (_env._config.enableMergeLocalNodeChooseDocsOptimalization - ? std::numeric_limits<uint32_t>().max() + ? std::numeric_limits<uint32_t>::max() : _maxChunkSize); cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize); cmd->setAddress(createAddress(_env._component.getClusterName(), nodes[1].index)); - // Add all the metadata, and thus use big limit. Max - // data to fetch parameter will control amount added. - findCandidates(bucket.getBucketId(), status, true, it->first, newMask, maxSize, *cmd); + // Add all the metadata, and thus use big limit. Max + // data to fetch parameter will control amount added. + findCandidates(bucket.getBucketId(), status, true, e.first, newMask, maxSize, *cmd); break; } } @@ -882,7 +860,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP // Verify that first node is not source only, and that all source only // nodes are at end of chain - for (uint16_t i=0; i<cmd.getNodes().size(); ++i) { + for (uint16_t i = 0; i < cmd.getNodes().size(); ++i) { if (i == 0) { if (cmd.getNodes()[i].sourceOnly) { tracker->fail(ReturnCode::ILLEGAL_PARAMETERS, @@ -966,8 +944,9 @@ namespace { api::GetBucketDiffCommand::Entry, bool> { bool operator()(const api::GetBucketDiffCommand::Entry& x, - const api::GetBucketDiffCommand::Entry& y) const - { return (x._timestamp < y._timestamp); } + const api::GetBucketDiffCommand::Entry& y) const { + return (x._timestamp < y._timestamp); + } }; /** @@ -1109,15 +1088,15 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker if (index + 1u >= cmd.getNodes().size()) { // Remove entries everyone has from list first. uint16_t completeMask = 0; - for (uint32_t i=0; i<cmd.getNodes().size(); ++i) { + for (uint32_t i = 0; i < cmd.getNodes().size(); ++i) { if (!cmd.getNodes()[i].sourceOnly) { completeMask |= (1 << i); } } std::vector<api::GetBucketDiffCommand::Entry> final; - for (uint32_t i=0, n=local.size(); i<n; ++i) { - if ((local[i]._hasMask & completeMask) != completeMask) { - final.push_back(local[i]); + for (const auto& e : local) { + if ((e._hasMask & completeMask) != completeMask) { + final.push_back(e); } } // Send reply @@ -1308,11 +1287,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } } std::vector<api::ApplyBucketDiffCommand::Entry>& local(cmd.getDiff()); - for (uint32_t i=0, n=local.size(); i<n; ++i) { - if ((local[i]._entry._hasMask & completeMask) == completeMask) { - local[i]._headerBlob.clear(); - local[i]._bodyBlob.clear(); - local[i]._docName.clear(); + for (auto& e : local) { + if ((e._entry._hasMask & completeMask) == completeMask) { + e._headerBlob.clear(); + e._bodyBlob.clear(); + e._docName.clear(); } } |