diff options
Diffstat (limited to 'storage/src/vespa/storage/persistence/mergehandler.cpp')
-rw-r--r-- | storage/src/vespa/storage/persistence/mergehandler.cpp | 97 |
1 files changed, 34 insertions, 63 deletions
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index b1e36147e30..7ee2d9f37bf 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -27,15 +27,13 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize) + uint32_t maxChunkSize) : _clock(clock), _cluster_context(cluster_context), _env(env), _spi(spi), _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), - _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), _executor(executor) { } @@ -48,6 +46,8 @@ MergeHandler::~MergeHandler() namespace { +constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u; + constexpr int getDeleteFlag() { // Referred into old slotfile code before. Where should this number come from? return 2; @@ -175,7 +175,7 @@ MergeHandler::buildBucketInfoList( std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const { - assert(output.size() == 0); + assert(output.empty()); assert(myNodeIndex < 16); uint32_t oldSize = output.size(); using DbBucketInfo = api::BucketInfo; @@ -487,13 +487,12 @@ MergeHandler::fetchLocalData( } document::Document::UP -MergeHandler::deserializeDiffDocument( - const api::ApplyBucketDiffCommand::Entry& e, - const document::DocumentTypeRepo& repo) const +MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, + const document::DocumentTypeRepo& repo) const { auto doc = std::make_unique<document::Document>(); vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size()); - if (e._bodyBlob.size() > 0) { + if (!e._bodyBlob.empty()) { // TODO Remove this branch and add warning on error. vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size()); doc->deserialize(repo, hbuf, bbuf); @@ -533,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results * Apply the diffs needed locally. */ void -MergeHandler::applyDiffLocally( - const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState> async_results) const +MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr<ApplyBucketDiffState> & async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", - bucket.toString().c_str(), - diff.size()); + bucket.toString().c_str(), diff.size()); uint32_t nodeMask = 1 << nodeIndex; uint32_t byteCount = 0; uint32_t addedCount = 0; @@ -563,9 +558,8 @@ MergeHandler::applyDiffLocally( if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) { ++j; - LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and " - "already present in persistence", bucket.toString().c_str(), - existing.toString().c_str()); + LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence", + bucket.toString().c_str(), existing.toString().c_str()); continue; } if ((e._entry._hasMask & nodeMask) != 0) { @@ -576,8 +570,7 @@ MergeHandler::applyDiffLocally( } if (!e.filled()) { ++i; - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -596,19 +589,14 @@ MergeHandler::applyDiffLocally( ++i; ++j; if ((e._entry._flags & DELETED) && !existing.isRemove()) { - LOG(debug, "Slot in diff is remove for existing " - "timestamp in %s. Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); applyDiffEntry(async_results, bucket, e, repo); } else { // Duplicate put, just ignore it. - LOG(debug, "During diff apply, attempting to add slot " - "whose timestamp already exists in %s, but assuming " - "these are for the same entry--ignoring it. " - "Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, " + " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); } continue; } @@ -623,8 +611,7 @@ MergeHandler::applyDiffLocally( continue; } if (!e.filled()) { - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -650,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket)); if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) { LOG(warning, "Failed to get bucket info for %s: %s", - bucket.toString().c_str(), - infoResult.getErrorMessage().c_str()); - throw std::runtime_error("Failed to invoke getBucketInfo on " - "persistence provider"); + bucket.toString().c_str(), infoResult.getErrorMessage().c_str()); + throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider"); } const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo()); - api::BucketInfo providerInfo(tmpInfo.getChecksum(), - tmpInfo.getDocumentCount(), - tmpInfo.getDocumentSize(), - tmpInfo.getEntryCount(), - tmpInfo.getUsedSize(), - tmpInfo.isReady(), - tmpInfo.isActive()); + api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(), + tmpInfo.getEntryCount(), tmpInfo.getUsedSize(), + tmpInfo.isReady(), tmpInfo.isActive()); _env.updateBucketDatabase(bucket.getBucket(), providerInfo); } @@ -698,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { LOG(warning, "Done with merge of %s (failed: %s) %s", - bucket.toString().c_str(), - status.reply->getResult().toString().c_str(), - status.toString().c_str()); + bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str()); return status.reply; } @@ -732,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index)); - findCandidates(status, - active_nodes_mask, - true, - 1 << (status.nodeList.size() - 1), - 1 << (nodes.size() - 1), - *cmd); - if (cmd->getDiff().size() != 0) { + findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1), + 1 << (nodes.size() - 1), *cmd); + if (!cmd->getDiff().empty()) { break; } cmd.reset(); @@ -748,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, active_nodes_mask = (1u << status.nodeList.size()) - 1; // If only one node left in the merge, return ok. if (status.nodeList.size() == 1) { - LOG(debug, "Done with merge of %s as there is only one node " - "that is not source only left in the merge.", + LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.", bucket.toString().c_str()); return status.reply; } @@ -777,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if (e.first == 0u) { continue; } - if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize) - || counts.size() == 1) - { - LOG(spam, "Sending separate apply bucket diff for path %x " - "with size %u", + if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) { + LOG(spam, "Sending separate apply bucket diff for path %x with size %u", e.first, e.second); std::vector<api::MergeBucketCommand::Node> nodes; // This node always has to be first in chain. @@ -837,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, status.pendingId = cmd->getMsgId(); LOG(debug, "Sending %s", cmd->toString().c_str()); sender.sendCommand(cmd); - return api::StorageReply::SP(); + return {}; } /** Ensures merge states are deleted if we fail operation */ @@ -1203,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s->diff.size() == 0); + assert(s->diff.empty()); s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); |