summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-10-05 17:42:12 +0200
committerGitHub <noreply@github.com>2020-10-05 17:42:12 +0200
commit0796d876df1a53a28b70901e5094511aae5547e8 (patch)
treef7fa87e2a2e60caba6b79d250983cf077b3e500c
parent6d831da50caa8f75732a1343bfc33e5b69fc470e (diff)
parent0197f19d1f4da6f2593f80ff56ee18cad9595407 (diff)
Merge pull request #14729 from vespa-engine/vekterli/merge-handler-code-cleanup
Code cleanup of MergeHandler. No functional changes.
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp199
1 files changed, 89 insertions, 110 deletions
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 612d4545a8a..4d06c28078d 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -203,7 +203,7 @@ MergeHandler::buildBucketInfoList(
if (dbInfo.valid()) {
LOG(warning, "Prior to merging %s we found that storage "
"bucket database was out of sync with content "
- "of file. Actual file content is %s while "
+ "of bucket. Actual bucket content is %s while "
"bucket database content was %s. Updating"
" bucket database to get in sync.",
bucket.toString().c_str(),
@@ -222,17 +222,16 @@ MergeHandler::buildBucketInfoList(
std::vector<spi::DocEntry::UP> entries;
populateMetaData(bucket, maxTimestamp, entries, context);
- for (size_t i = 0; i < entries.size(); ++i) {
+ for (const auto& entry : entries) {
api::GetBucketDiffCommand::Entry diff;
- const spi::DocEntry& entry(*entries[i]);
diff._gid = GlobalId();
// We do not know doc sizes at this point, so just set to 0
diff._headerSize = 0;
diff._bodySize = 0;
- diff._timestamp = entry.getTimestamp();
+ diff._timestamp = entry->getTimestamp();
diff._flags = IN_USE
- | (entry.isRemove() ? DELETED : 0);
- diff._hasMask = 1 << myNodeIndex;
+ | (entry->isRemove() ? DELETED : 0);
+ diff._hasMask = 1U << myNodeIndex;
output.push_back(diff);
LOG(spam, "bucket info list of %s: Adding entry %s to diff",
@@ -256,11 +255,11 @@ namespace {
if (!forwards && nodeIndex == 0) return false;
uint32_t result = 1 << nodeIndex;
uint32_t mask = 3 << (forwards ? nodeIndex : nodeIndex-1);
- for (std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it
- = diff.begin(); it != diff.end(); ++it)
- {
- if (it->filled()) continue;
- if ((it->_entry._hasMask & mask) == result) {
+ for (const auto& e : diff) {
+ if (e.filled()) {
+ continue;
+ }
+ if ((e._entry._hasMask & mask) == result) {
return true;
}
}
@@ -277,31 +276,24 @@ namespace {
{
uint32_t nodeMask = 1 << nodeIndex;
bool foundEntries = false;
- for (std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it
- = diff.begin(); it != diff.end(); ++it)
- {
+ for (const auto& e : diff) {
// Ignore entries we don't need locally
- if ((it->_entry._hasMask & nodeMask) != 0) continue;
+ if ((e._entry._hasMask & nodeMask) != 0) {
+ continue;
+ }
foundEntries = true;
- if (it->filled()) return true;
+ if (e.filled()) {
+ return true;
+ }
}
if (foundEntries) {
- LOG(spam, "Merge(): Found entries needed, but they don't contain "
- "data");
+ LOG(spam, "Merge(): Found entries needed, but they don't contain data");
}
return false;
}
- int
- countUnfilledEntries(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
- {
- int count = 0;
-
- for (uint32_t i=0, n=diff.size(); i<n; ++i) {
- if (!diff[i].filled()) count++;
- }
-
- return count;
+ int countUnfilledEntries(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) {
+ return std::count_if(diff.begin(), diff.end(), [](auto& e) { return !e.filled(); });
};
/**
@@ -315,8 +307,7 @@ namespace {
return value;
}
- api::StorageMessageAddress createAddress(const std::string& clusterName, uint16_t node)
- {
+ api::StorageMessageAddress createAddress(const std::string& clusterName, uint16_t node) {
return api::StorageMessageAddress(clusterName, lib::NodeType::STORAGE, node);
}
@@ -351,10 +342,9 @@ MergeHandler::fetchLocalData(
// Preload documents in memory
std::vector<spi::Timestamp> slots;
uint32_t alreadyFilled = 0;
- for (uint32_t i=0, n=diff.size(); i<n; ++i) {
- api::ApplyBucketDiffCommand::Entry& e(diff[i]);
+ for (const auto& e : diff) {
if ((e._entry._hasMask & nodeMask) != 0 && !e.filled()) {
- slots.push_back(spi::Timestamp(e._entry._timestamp));
+ slots.emplace_back(e._entry._timestamp);
}
if (e.filled()) {
alreadyFilled += e._headerBlob.size() + e._bodyBlob.size();
@@ -406,18 +396,18 @@ MergeHandler::fetchLocalData(
throw std::runtime_error(ss.str());
}
auto list = result.steal_entries();
- for (size_t i = 0; i < list.size(); ++i) {
- if (list[i]->getSize() <= remainingSize
+ for (auto& entry : list) {
+ if (entry->getSize() <= remainingSize
|| (entries.empty() && alreadyFilled == 0))
{
- remainingSize -= list[i]->getSize();
+ remainingSize -= entry->getSize();
LOG(spam, "Added %s, remainingSize is %u",
entries.back()->toString().c_str(), remainingSize);
- entries.push_back(std::move(list[i]));
+ entries.push_back(std::move(entry));
} else {
LOG(spam, "Adding %s would exceed chunk size limit of %u; "
"not filling up any more diffs for current round",
- list[i]->toString().c_str(), _maxChunkSize);
+ entry->toString().c_str(), _maxChunkSize);
chunkLimitReached = true;
break;
}
@@ -432,22 +422,20 @@ MergeHandler::fetchLocalData(
document::BucketIdFactory idFactory;
- for (size_t i=0; i<entries.size(); ++i) {
- const spi::DocEntry& docEntry(*entries[i]);
- LOG(spam, "fetchLocalData: processing %s",
- docEntry.toString().c_str());
+ for (const auto& entry_ptr : entries) {
+ const auto& docEntry = *entry_ptr;
+ LOG(spam, "fetchLocalData: processing %s", docEntry.toString().c_str());
- std::vector<api::ApplyBucketDiffCommand::Entry>::iterator iter(
- std::lower_bound(diff.begin(), diff.end(),
- api::Timestamp(docEntry.getTimestamp()),
- DiffEntryTimestampPredicate()));
+ auto iter = std::lower_bound(diff.begin(), diff.end(),
+ api::Timestamp(docEntry.getTimestamp()),
+ DiffEntryTimestampPredicate());
assert(iter != diff.end());
assert(iter->_entry._timestamp == docEntry.getTimestamp());
api::ApplyBucketDiffCommand::Entry& e(*iter);
if (!docEntry.isRemove()) {
const Document* doc = docEntry.getDocument();
- assert(doc != 0);
+ assert(doc != nullptr);
assertContainedInBucket(doc->getId(), bucket, idFactory);
e._docName = doc->getId().toString();
vespalib::nbostream stream;
@@ -457,7 +445,7 @@ MergeHandler::fetchLocalData(
e._bodyBlob.clear();
} else {
const DocumentId* docId = docEntry.getDocumentId();
- assert(docId != 0);
+ assert(docId != nullptr);
assertContainedInBucket(*docId, bucket, idFactory);
if (e._entry._flags & DELETED) {
e._docName = docId->toString();
@@ -470,8 +458,7 @@ MergeHandler::fetchLocalData(
e._repo = _env._repo.get();
}
- for (size_t i=0; i<diff.size(); ++i) {
- api::ApplyBucketDiffCommand::Entry& e(diff[i]);
+ for (auto& e : diff) {
if ((e._entry._hasMask & nodeMask) == 0 || e.filled()) {
continue;
}
@@ -587,8 +574,7 @@ MergeHandler::applyDiffLocally(
bucket.toString().c_str(), e.toString().c_str());
applyDiffEntry(bucket, e, context, *repo);
} else {
- assert(spi::Timestamp(e._entry._timestamp)
- == existing.getTimestamp());
+ assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp());
// Diffing for existing timestamp; should either both be put
// dupes (which is a common case) or the new entry should be an
// unrevertable remove.
@@ -671,24 +657,22 @@ namespace {
uint32_t maxSize, api::ApplyBucketDiffCommand& cmd)
{
uint32_t chunkSize = 0;
- for (std::deque<api::GetBucketDiffCommand::Entry>::const_iterator it
- = status.diff.begin(); it != status.diff.end(); ++it)
- {
- if (constrictHasMask && it->_hasMask != hasMask) {
+ for (const auto& entry : status.diff) {
+ if (constrictHasMask && entry._hasMask != hasMask) {
continue;
}
if (chunkSize != 0 &&
- chunkSize + it->_bodySize + it->_headerSize > maxSize)
+ chunkSize + entry._bodySize + entry._headerSize > maxSize)
{
LOG(spam, "Merge of %s used %d bytes, max is %d. Will "
"fetch in next merge round.",
id.toString().c_str(),
- chunkSize + it->_bodySize + it->_headerSize,
+ chunkSize + entry._bodySize + entry._headerSize,
maxSize);
break;
}
- chunkSize += it->_bodySize + it->_headerSize;
- cmd.getDiff().push_back(api::ApplyBucketDiffCommand::Entry(*it));
+ chunkSize += entry._bodySize + entry._headerSize;
+ cmd.getDiff().emplace_back(entry);
if (constrictHasMask) {
cmd.getDiff().back()._entry._hasMask = newHasMask;
}
@@ -710,7 +694,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
}
// If nothing to update, we're done.
- if (status.diff.size() == 0) {
+ if (status.diff.empty()) {
LOG(debug, "Done with merge of %s. No more entries in diff.", bucket.toString().c_str());
return status.reply;
}
@@ -723,9 +707,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
// merge.
while (status.nodeList.back().sourceOnly) {
std::vector<api::MergeBucketCommand::Node> nodes;
- for (uint16_t i=0; i<status.nodeList.size(); ++i) {
- if (!status.nodeList[i].sourceOnly) {
- nodes.push_back(status.nodeList[i]);
+ for (const auto& node : status.nodeList) {
+ if (!node.sourceOnly) {
+ nodes.emplace_back(node);
}
}
nodes.push_back(status.nodeList.back());
@@ -735,7 +719,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
// data to fetch parameter will control amount added.
uint32_t maxSize =
(_env._config.enableMergeLocalNodeChooseDocsOptimalization
- ? std::numeric_limits<uint32_t>().max()
+ ? std::numeric_limits<uint32_t>::max()
: _maxChunkSize);
cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize);
@@ -749,17 +733,15 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
*cmd);
if (cmd->getDiff().size() != 0) break;
cmd.reset();
- // If we found no data to merge from the last source only node,
- // remove it and retry. (Clear it out of the hasmask such that we
- // can match hasmask with operator==)
+ // If we found no data to merge from the last source only node,
+ // remove it and retry. (Clear it out of the hasmask such that we
+ // can match hasmask with operator==)
status.nodeList.pop_back();
uint16_t mask = ~(1 << status.nodeList.size());
- for (std::deque<api::GetBucketDiffCommand::Entry>::iterator it
- = status.diff.begin(); it != status.diff.end(); ++it)
- {
- it->_hasMask &= mask;
+ for (auto& e : status.diff) {
+ e._hasMask &= mask;
}
- // If only one node left in the merge, return ok.
+ // If only one node left in the merge, return ok.
if (status.nodeList.size() == 1) {
LOG(debug, "Done with merge of %s as there is only one node "
"that is not source only left in the merge.",
@@ -767,40 +749,36 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
return status.reply;
}
}
- // If we did not have a source only node, check if we have a path with
- // many documents within it that we'll merge separately
- if (cmd.get() == 0) {
+ // If we did not have a source only node, check if we have a path with
+ // many documents within it that we'll merge separately
+ if (!cmd) {
std::map<uint16_t, uint32_t> counts;
- for (std::deque<api::GetBucketDiffCommand::Entry>::const_iterator it
- = status.diff.begin(); it != status.diff.end(); ++it)
- {
- ++counts[it->_hasMask];
+ for (const auto& e : status.diff) {
+ ++counts[e._hasMask];
}
- for (std::map<uint16_t, uint32_t>::const_iterator it = counts.begin();
- it != counts.end(); ++it)
- {
- if (it->second >= uint32_t(_env._config.commonMergeChainOptimalizationMinimumSize)
+ for (const auto& e : counts) {
+ if (e.second >= uint32_t(_env._config.commonMergeChainOptimalizationMinimumSize)
|| counts.size() == 1)
{
LOG(spam, "Sending separate apply bucket diff for path %x "
"with size %u",
- it->first, it->second);
+ e.first, e.second);
std::vector<api::MergeBucketCommand::Node> nodes;
- // This node always has to be first in chain.
+ // This node always has to be first in chain.
nodes.push_back(status.nodeList[0]);
- // Add all the nodes that lack the docs in question
- for (uint16_t i=1; i<status.nodeList.size(); ++i) {
- if ((it->first & (1 << i)) == 0) {
+ // Add all the nodes that lack the docs in question
+ for (uint16_t i = 1; i < status.nodeList.size(); ++i) {
+ if ((e.first & (1 << i)) == 0) {
nodes.push_back(status.nodeList[i]);
}
}
uint16_t newMask = 1;
- // If this node doesn't have the docs, add a node that has
- // them to the end of the chain, so the data is applied
- // going back.
- if ((it->first & 1) == 0) {
+ // If this node doesn't have the docs, add a node that has
+ // them to the end of the chain, so the data is applied
+ // going back.
+ if ((e.first & 1) == 0) {
for (uint16_t i=1; i<status.nodeList.size(); ++i) {
- if ((it->first & (1 << i)) != 0) {
+ if ((e.first & (1 << i)) != 0) {
nodes.push_back(status.nodeList[i]);
break;
}
@@ -810,13 +788,13 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
assert(nodes.size() > 1);
uint32_t maxSize =
(_env._config.enableMergeLocalNodeChooseDocsOptimalization
- ? std::numeric_limits<uint32_t>().max()
+ ? std::numeric_limits<uint32_t>::max()
: _maxChunkSize);
cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize);
cmd->setAddress(createAddress(_env._component.getClusterName(), nodes[1].index));
- // Add all the metadata, and thus use big limit. Max
- // data to fetch parameter will control amount added.
- findCandidates(bucket.getBucketId(), status, true, it->first, newMask, maxSize, *cmd);
+ // Add all the metadata, and thus use big limit. Max
+ // data to fetch parameter will control amount added.
+ findCandidates(bucket.getBucketId(), status, true, e.first, newMask, maxSize, *cmd);
break;
}
}
@@ -882,7 +860,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
// Verify that first node is not source only, and that all source only
// nodes are at end of chain
- for (uint16_t i=0; i<cmd.getNodes().size(); ++i) {
+ for (uint16_t i = 0; i < cmd.getNodes().size(); ++i) {
if (i == 0) {
if (cmd.getNodes()[i].sourceOnly) {
tracker->fail(ReturnCode::ILLEGAL_PARAMETERS,
@@ -966,8 +944,9 @@ namespace {
api::GetBucketDiffCommand::Entry, bool>
{
bool operator()(const api::GetBucketDiffCommand::Entry& x,
- const api::GetBucketDiffCommand::Entry& y) const
- { return (x._timestamp < y._timestamp); }
+ const api::GetBucketDiffCommand::Entry& y) const {
+ return (x._timestamp < y._timestamp);
+ }
};
/**
@@ -1109,15 +1088,15 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker
if (index + 1u >= cmd.getNodes().size()) {
// Remove entries everyone has from list first.
uint16_t completeMask = 0;
- for (uint32_t i=0; i<cmd.getNodes().size(); ++i) {
+ for (uint32_t i = 0; i < cmd.getNodes().size(); ++i) {
if (!cmd.getNodes()[i].sourceOnly) {
completeMask |= (1 << i);
}
}
std::vector<api::GetBucketDiffCommand::Entry> final;
- for (uint32_t i=0, n=local.size(); i<n; ++i) {
- if ((local[i]._hasMask & completeMask) != completeMask) {
- final.push_back(local[i]);
+ for (const auto& e : local) {
+ if ((e._hasMask & completeMask) != completeMask) {
+ final.push_back(e);
}
}
// Send reply
@@ -1308,11 +1287,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
}
std::vector<api::ApplyBucketDiffCommand::Entry>& local(cmd.getDiff());
- for (uint32_t i=0, n=local.size(); i<n; ++i) {
- if ((local[i]._entry._hasMask & completeMask) == completeMask) {
- local[i]._headerBlob.clear();
- local[i]._bodyBlob.clear();
- local[i]._docName.clear();
+ for (auto& e : local) {
+ if ((e._entry._hasMask & completeMask) == completeMask) {
+ e._headerBlob.clear();
+ e._bodyBlob.clear();
+ e._docName.clear();
}
}