aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/mergehandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storage/persistence/mergehandler.cpp')
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp97
1 files changed, 34 insertions, 63 deletions
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index b1e36147e30..7ee2d9f37bf 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -27,15 +27,13 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
- uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize)
+ uint32_t maxChunkSize)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
_spi(spi),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
- _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
_executor(executor)
{
}
@@ -48,6 +46,8 @@ MergeHandler::~MergeHandler()
namespace {
+constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u;
+
constexpr int getDeleteFlag() {
// Referred into old slotfile code before. Where should this number come from?
return 2;
@@ -175,7 +175,7 @@ MergeHandler::buildBucketInfoList(
std::vector<api::GetBucketDiffCommand::Entry>& output,
spi::Context& context) const
{
- assert(output.size() == 0);
+ assert(output.empty());
assert(myNodeIndex < 16);
uint32_t oldSize = output.size();
using DbBucketInfo = api::BucketInfo;
@@ -487,13 +487,12 @@ MergeHandler::fetchLocalData(
}
document::Document::UP
-MergeHandler::deserializeDiffDocument(
- const api::ApplyBucketDiffCommand::Entry& e,
- const document::DocumentTypeRepo& repo) const
+MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e,
+ const document::DocumentTypeRepo& repo) const
{
auto doc = std::make_unique<document::Document>();
vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size());
- if (e._bodyBlob.size() > 0) {
+ if (!e._bodyBlob.empty()) {
// TODO Remove this branch and add warning on error.
vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size());
doc->deserialize(repo, hbuf, bbuf);
@@ -533,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
* Apply the diffs needed locally.
*/
void
-MergeHandler::applyDiffLocally(
- const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState> async_results) const
+MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context,
+ const std::shared_ptr<ApplyBucketDiffState> & async_results) const
{
// Sort the data to apply by which file they should be added to
LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries",
- bucket.toString().c_str(),
- diff.size());
+ bucket.toString().c_str(), diff.size());
uint32_t nodeMask = 1 << nodeIndex;
uint32_t byteCount = 0;
uint32_t addedCount = 0;
@@ -563,9 +558,8 @@ MergeHandler::applyDiffLocally(
if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) {
++j;
- LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and "
- "already present in persistence", bucket.toString().c_str(),
- existing.toString().c_str());
+ LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence",
+ bucket.toString().c_str(), existing.toString().c_str());
continue;
}
if ((e._entry._hasMask & nodeMask) != 0) {
@@ -576,8 +570,7 @@ MergeHandler::applyDiffLocally(
}
if (!e.filled()) {
++i;
- LOG(debug, "Failed to apply unretrieved entry %s to diff "
- "locally on %s. Entry was probably compacted away.",
+ LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.",
e.toString().c_str(), bucket.toString().c_str());
continue;
}
@@ -596,19 +589,14 @@ MergeHandler::applyDiffLocally(
++i;
++j;
if ((e._entry._flags & DELETED) && !existing.isRemove()) {
- LOG(debug, "Slot in diff is remove for existing "
- "timestamp in %s. Diff slot: %s. Existing slot: %s",
- bucket.toString().c_str(), e.toString().c_str(),
- existing.toString().c_str());
+ LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s",
+ bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
applyDiffEntry(async_results, bucket, e, repo);
} else {
// Duplicate put, just ignore it.
- LOG(debug, "During diff apply, attempting to add slot "
- "whose timestamp already exists in %s, but assuming "
- "these are for the same entry--ignoring it. "
- "Diff slot: %s. Existing slot: %s",
- bucket.toString().c_str(), e.toString().c_str(),
- existing.toString().c_str());
+ LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, "
+ " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s",
+ bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
}
continue;
}
@@ -623,8 +611,7 @@ MergeHandler::applyDiffLocally(
continue;
}
if (!e.filled()) {
- LOG(debug, "Failed to apply unretrieved entry %s to diff "
- "locally on %s. Entry was probably compacted away.",
+ LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.",
e.toString().c_str(), bucket.toString().c_str());
continue;
}
@@ -650,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket));
if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) {
LOG(warning, "Failed to get bucket info for %s: %s",
- bucket.toString().c_str(),
- infoResult.getErrorMessage().c_str());
- throw std::runtime_error("Failed to invoke getBucketInfo on "
- "persistence provider");
+ bucket.toString().c_str(), infoResult.getErrorMessage().c_str());
+ throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider");
}
const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo());
- api::BucketInfo providerInfo(tmpInfo.getChecksum(),
- tmpInfo.getDocumentCount(),
- tmpInfo.getDocumentSize(),
- tmpInfo.getEntryCount(),
- tmpInfo.getUsedSize(),
- tmpInfo.isReady(),
- tmpInfo.isActive());
+ api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(),
+ tmpInfo.getEntryCount(), tmpInfo.getUsedSize(),
+ tmpInfo.isReady(), tmpInfo.isActive());
_env.updateBucketDatabase(bucket.getBucket(), providerInfo);
}
@@ -698,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
LOG(warning, "Done with merge of %s (failed: %s) %s",
- bucket.toString().c_str(),
- status.reply->getResult().toString().c_str(),
- status.toString().c_str());
+ bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str());
return status.reply;
}
@@ -732,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index));
- findCandidates(status,
- active_nodes_mask,
- true,
- 1 << (status.nodeList.size() - 1),
- 1 << (nodes.size() - 1),
- *cmd);
- if (cmd->getDiff().size() != 0) {
+ findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1),
+ 1 << (nodes.size() - 1), *cmd);
+ if (!cmd->getDiff().empty()) {
break;
}
cmd.reset();
@@ -748,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
active_nodes_mask = (1u << status.nodeList.size()) - 1;
// If only one node left in the merge, return ok.
if (status.nodeList.size() == 1) {
- LOG(debug, "Done with merge of %s as there is only one node "
- "that is not source only left in the merge.",
+ LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.",
bucket.toString().c_str());
return status.reply;
}
@@ -777,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
if (e.first == 0u) {
continue;
}
- if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize)
- || counts.size() == 1)
- {
- LOG(spam, "Sending separate apply bucket diff for path %x "
- "with size %u",
+ if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) {
+ LOG(spam, "Sending separate apply bucket diff for path %x with size %u",
e.first, e.second);
std::vector<api::MergeBucketCommand::Node> nodes;
// This node always has to be first in chain.
@@ -837,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
status.pendingId = cmd->getMsgId();
LOG(debug, "Sending %s", cmd->toString().c_str());
sender.sendCommand(cmd);
- return api::StorageReply::SP();
+ return {};
}
/** Ensures merge states are deleted if we fail operation */
@@ -1203,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
assert(reply.getNodes().size() >= 2);
// Get bucket diff should retrieve all info at once
- assert(s->diff.size() == 0);
+ assert(s->diff.empty());
s->diff.insert(s->diff.end(),
reply.getDiff().begin(),
reply.getDiff().end());